From 15579b016e439dddc2ab5a97fd4dbf4bcaa99f15 Mon Sep 17 00:00:00 2001 From: 17ms <79069176+17ms@users.noreply.github.com> Date: Fri, 26 May 2023 01:50:55 +0300 Subject: [PATCH] compressed & more concise structure (untested) --- src/client.rs | 117 +++++++++++++++++++++++ src/connector.rs | 231 ---------------------------------------------- src/crypto.rs | 8 +- src/lib.rs | 4 +- src/listener.rs | 236 ----------------------------------------------- src/server.rs | 162 ++++++++++++++++++++++++++++++++ src/sockets.rs | 12 +-- src/util.rs | 55 ++++++++++- 8 files changed, 343 insertions(+), 482 deletions(-) create mode 100644 src/client.rs delete mode 100755 src/connector.rs delete mode 100755 src/listener.rs create mode 100644 src/server.rs diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..8a93e5f --- /dev/null +++ b/src/client.rs @@ -0,0 +1,117 @@ +use std::{error::Error, net::SocketAddr, path::PathBuf}; + +use tokio::{io::AsyncWriteExt, net::TcpStream}; + +use crate::{ + crypto::{self, Crypto}, + sockets::SocketHandler, + util::{new_file, FileInfo}, +}; + +#[derive(Clone)] +pub struct Client { + addr: SocketAddr, + key: String, + output: PathBuf, +} + +impl Client { + pub fn new(addr: SocketAddr, key: String, output: PathBuf) -> Self { + Self { addr, key, output } + } + + pub async fn connection(&self) -> Result<(), Box> { + let mut socket = TcpStream::connect(self.addr).await?; + let mut handler = SocketHandler::new(&mut socket); + let crypto = Crypto::new(&mut handler, true).await?; + handler.set_crypto(crypto); + + if !self.authorize(&mut handler).await? { + // log: invalid access key '' + return Ok(()); + } + + let metadata = self.metadata(&mut handler).await?; + self.requests(&mut handler, metadata).await?; + + Ok(()) + } + + async fn authorize( + &self, + handler: &mut SocketHandler<'_>, + ) -> Result> { + let msg = self.key.as_bytes().to_vec(); + handler.send(&msg).await?; + + let buf = handler.recv().await?; + let msg = String::from_utf8(buf)?; + let msg = msg.trim(); + + if msg == "DISCONNECT" { + return Ok(false); + } + + Ok(true) + } + + async fn metadata( + &self, + handler: &mut SocketHandler<'_>, + ) -> Result, Box> { + let buf = handler.recv().await?; + let amt = String::from_utf8(buf.clone())?.parse::()?; + handler.send(&buf).await?; // confirmation + + let mut metadata = Vec::new(); + + while metadata.len() < amt { + let buf = handler.recv().await?; + let data = String::from_utf8(buf)?; + + let split = data.split(':').collect::>(); + let name = split[0].trim().to_string(); + let size = split[1].trim().parse::()?; + let hash = split[2].trim().to_string(); + + let info = FileInfo::new(name, size, hash); + + metadata.push(info); + } + + Ok(metadata) + } + + async fn requests( + &self, + handler: &mut SocketHandler<'_>, + metadata: Vec, + ) -> Result<(), Box> { + for file in metadata { + let (mut handle, path) = new_file(self.output.clone(), &file.name).await?; + let msg = file.hash.as_bytes().to_vec(); + handler.send(&msg).await?; + + // log: downloading file to + + let mut remaining = file.size; + + while remaining != 0 { + let buf = handler.recv().await?; + handle.write_all(&buf).await?; + handle.flush().await?; + remaining -= buf.len() as u64; + } + + let check_hash = crypto::try_hash(&path)?; + let msg = check_hash.as_bytes().to_vec(); + handler.send(&msg).await?; + + if check_hash != file.hash { + return Err("Unsuccessful file transfer, hashes don't match".into()); + } // else: log that the transfer was successful + } + + Ok(()) + } +} diff --git a/src/connector.rs b/src/connector.rs deleted file mode 100755 index 271c86e..0000000 --- a/src/connector.rs +++ /dev/null @@ -1,231 +0,0 @@ -use std::{collections::HashMap, error::Error, net::SocketAddr, path::PathBuf}; - -use tokio::{ - fs::File, - io::{AsyncWriteExt, BufWriter}, - net::TcpStream, - sync::mpsc, -}; - -use crate::{ - common::{Connection, Message}, - comms, crypto, -}; - -#[derive(Debug)] -pub struct Request { - pub name: String, - pub size: u64, - pub hash: String, -} - -impl Request { - pub fn new(name: String, metadata: &HashMap) -> Option { - let (size, hash) = metadata.get(&name)?.clone(); - Some(Self { name, size, hash }) - } -} - -#[derive(Debug, Clone)] -pub struct Connector { - target_addr: SocketAddr, - access_key: String, - output_path: PathBuf, -} - -impl Connector { - pub fn new(target_addr: SocketAddr, access_key: String, output_path: PathBuf) -> Self { - Self { - target_addr, - access_key, - output_path, - } - } - - pub async fn connect( - self, - tx: mpsc::Sender, - mut rx: mpsc::Receiver, - ) -> Result<(), Box> { - let mut socket = TcpStream::connect(self.target_addr).await?; - let mut connection = Connection::new(&mut socket).await?; - - self.authorize(&mut connection).await?; - let metadata = self.metadata(&mut connection).await?; - tx.send(Message::Metadata(metadata.clone())).await?; - self.request_handler(&mut connection, &mut rx, &metadata) - .await?; - - let msg = b"FIN".to_vec(); - comms::send( - &mut connection.writer, - Some(&mut connection.cipher), - Some(&mut connection.rng), - &msg, - ) - .await?; - - Ok(()) - } - - async fn authorize( - &self, - conn: &mut Connection<'_>, - ) -> Result<(), Box> { - let msg = self.access_key.to_string().as_bytes().to_vec(); - comms::send( - &mut conn.writer, - Some(&mut conn.cipher), - Some(&mut conn.rng), - &msg, - ) - .await?; - - let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?; - let msg = String::from_utf8(buf)?; - - if msg == "FIN" { - return Err("Incorrect access key".into()); - } - - Ok(()) - } - - async fn metadata( - &self, - conn: &mut Connection<'_>, - ) -> Result, Box> { - let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?; - let amt: usize = String::from_utf8(buf)?.parse()?; - - let msg = b"AMT".to_vec(); - comms::send( - &mut conn.writer, - Some(&mut conn.cipher), - Some(&mut conn.rng), - &msg, - ) - .await?; - - let mut metadata = HashMap::new(); - - while metadata.len() < amt { - let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?; - let msg = String::from_utf8(buf)?; - - let split: Vec<&str> = msg.split(':').collect(); - let name = split[0].trim().to_string(); - let size: u64 = split[1].trim().parse()?; - let hash = split[2].trim().to_string(); - - metadata.insert(name, (size, hash)); - } - - Ok(metadata) - } - - async fn new_handle( - &self, - filename: &str, - ) -> Result<(BufWriter, PathBuf), Box> { - let mut path = self.output_path.clone(); - path.push(filename); - let filehandle = File::create(&path).await?; - - Ok((BufWriter::new(filehandle), path)) - } - - async fn request( - &self, - conn: &mut Connection<'_>, - req: Request, - ) -> Result> { - let (mut handle, path) = self.new_handle(&req.name).await?; - let msg = req.hash.as_bytes().to_vec(); - comms::send( - &mut conn.writer, - Some(&mut conn.cipher), - Some(&mut conn.rng), - &msg, - ) - .await?; - - let mut remaining = req.size; - - while remaining != 0 { - let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?; - handle.write_all(&buf).await?; - handle.flush().await?; - remaining -= buf.len() as u64; - } - - let new_hash = crypto::try_hash(&path)?; - - let msg: Vec = if new_hash == req.hash { - b"OK".to_vec() - } else { - b"ERROR".to_vec() - }; - - comms::send( - &mut conn.writer, - Some(&mut conn.cipher), - Some(&mut conn.rng), - &msg, - ) - .await?; - - Ok(true) - } - - async fn request_handler( - &self, - conn: &mut Connection<'_>, - rx: &mut mpsc::Receiver, - metadata: &HashMap, - ) -> Result<(), Box> { - loop { - let msg = rx.recv().await; - if msg.is_none() { - continue; - } - - match self.msg_handler(msg.unwrap(), conn, metadata).await { - Ok(true) => continue, - Ok(false) => break, - Err(e) => return Err(e), - } - } - - Ok(()) - } - - async fn msg_handler( - &self, - msg: Message, - conn: &mut Connection<'_>, - metadata: &HashMap, - ) -> Result> { - match msg { - Message::ClientReq(name) => { - let req = Request::new(name, metadata).unwrap(); - self.request(conn, req).await?; - - Ok(true) - } - Message::Shutdown => { - let msg = b"DISCONNECT".to_vec(); - comms::send( - &mut conn.writer, - Some(&mut conn.cipher), - Some(&mut conn.rng), - &msg, - ) - .await?; - - Ok(false) - } - _ => Ok(true), - } - } -} diff --git a/src/crypto.rs b/src/crypto.rs index 233b974..3de7917 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -41,11 +41,11 @@ impl Crypto { let msg = own_pbk.as_bytes().to_vec(); if go_first { - handler.send(&msg).await?; - buf = handler.recv().await?; + handler.send_raw(&msg).await?; + buf = handler.recv_raw().await?; } else { - buf = handler.recv().await?; - handler.send(&msg).await?; + buf = handler.recv_raw().await?; + handler.send_raw(&msg).await?; } let slice: [u8; DH_PBK_SIZE] = buf[..DH_PBK_SIZE].try_into()?; diff --git a/src/lib.rs b/src/lib.rs index 6853227..266c5b4 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ pub mod cli; -pub mod connector; +pub mod client; pub mod crypto; -pub mod listener; pub mod parser; +pub mod server; pub mod sockets; pub mod util; diff --git a/src/listener.rs b/src/listener.rs deleted file mode 100755 index 49d1366..0000000 --- a/src/listener.rs +++ /dev/null @@ -1,236 +0,0 @@ -use std::{collections::HashMap, error::Error, net::SocketAddr, path::PathBuf, sync::Arc}; - -use tokio::{ - fs::File, - io::AsyncReadExt, - net::{TcpListener, TcpStream}, - sync::mpsc, -}; - -use crate::{ - common::{Connection, Message}, - comms, crypto, -}; - -#[derive(Debug, Clone)] -pub struct Listener { - host_addr: SocketAddr, - access_key: String, - chunksize: usize, -} - -// TODO: impl Drop (?) - -impl Listener { - pub fn new( - host_addr: SocketAddr, - access_key: String, - chunksize: usize, - ) -> Result, Box> { - Ok(Arc::new(Self { - host_addr, - access_key, - chunksize, - })) - } - - pub async fn start( - self: Arc, - tx: mpsc::Sender, - mut kill: mpsc::Receiver, - files: Vec, - ) -> Result<(), Box> { - tokio::select! { - _ = self.listen(tx, files) => Ok(()), - _ = kill.recv() => Ok(()), - } - } - - async fn listen( - self: Arc, - tx: mpsc::Sender, - files: Vec, - ) -> Result<(), Box> { - let listener = TcpListener::bind(self.host_addr).await?; - - loop { - let files = files.clone(); - let (mut socket, addr) = listener.accept().await?; - tx.send(Message::ClientConnect(addr)).await?; - let conn_tx = tx.clone(); - let err_tx = tx.clone(); - let conn_self = Arc::clone(&self); - - match tokio::spawn(async move { - conn_self - .connection(&mut socket, addr, conn_tx, &files) - .await - }) - .await - { - Ok(_) => {} - Err(e) => { - let err_msg = format!("{}: {}", addr, e); - err_tx.send(Message::Error(err_msg)).await?; - } - }; - } - } - - async fn connection( - &self, - socket: &mut TcpStream, - addr: SocketAddr, - tx: mpsc::Sender, - files: &Vec, - ) -> Result<(), Box> { - let mut connection = Connection::new(socket).await?; - - if !self.authorize(&mut connection).await? { - return Ok::<(), Box>(()); - } - - let index = self.metadata_handler(&mut connection, files).await?; - tx.send(Message::ConnectionReady).await?; - self.request_handler(&mut connection, &index).await?; - tx.send(Message::ClientDisconnect(addr)).await?; - - Ok::<(), Box>(()) - } - - async fn authorize( - &self, - conn: &mut Connection<'_>, - ) -> Result> { - let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?; - let key = String::from_utf8(buf)?; - let msg: Vec; - let res: bool; - - if key != self.access_key { - res = false; - msg = b"DISCONNECT".to_vec(); - } else { - res = true; - msg = b"OK".to_vec(); - } - - comms::send( - &mut conn.writer, - Some(&mut conn.cipher), - Some(&mut conn.rng), - &msg, - ) - .await?; - - Ok(res) - } - - async fn metadata( - &self, - files: &Vec, - ) -> Result< - (usize, Vec<(String, u64, String)>, HashMap), - Box, - > { - let mut metadata: Vec<(String, u64, String)> = Vec::new(); - let mut index = HashMap::new(); - - for path in files { - let split: Vec<&str> = path.to_str().unwrap().split('/').collect(); - let name = split[split.len() - 1].to_string(); - let handle = File::open(path).await?; - let size = handle.metadata().await?.len(); - let hash = crypto::try_hash(path)?; - - if size > 0 { - metadata.push((name, size, hash.clone())); - index.insert(hash, path.clone()); - } - } - - Ok((metadata.len(), metadata, index)) - } - - async fn metadata_handler( - &self, - conn: &mut Connection<'_>, - files: &Vec, - ) -> Result, Box> { - let (amt, metadata, index) = self.metadata(files).await?; - let msg = amt.to_string().as_bytes().to_vec(); - - comms::send( - &mut conn.writer, - Some(&mut conn.cipher), - Some(&mut conn.rng), - &msg, - ) - .await?; - - let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?; - let msg = String::from_utf8(buf)?; - - if msg != "AMT" { - return Err("Broken message sequence".into()); - } - - for file in metadata { - let msg = format!("{}:{}:{}", file.0, file.1, file.2) - .as_bytes() - .to_vec(); - - comms::send( - &mut conn.writer, - Some(&mut conn.cipher), - Some(&mut conn.rng), - &msg, - ) - .await?; - } - - Ok(index) - } - - async fn request_handler( - &self, - conn: &mut Connection<'_>, - index: &HashMap, - ) -> Result<(), Box> { - loop { - let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?; - let cmd = String::from_utf8(buf)?; - - if cmd == "DISCONNECT" { - break; - } - - let mut file = File::open(index[&cmd].clone()).await?; - let mut remaining = file.metadata().await?.len(); - let mut send_buf = vec![0u8; self.chunksize]; - - while remaining != 0 { - let n = file.read(&mut send_buf).await?; - - comms::send( - &mut conn.writer, - Some(&mut conn.cipher), - Some(&mut conn.rng), - &send_buf[..n].to_vec(), - ) - .await?; - - remaining -= n as u64; - } - - let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?; - let msg = String::from_utf8(buf)?; - - if msg == "ERROR" { - return Err("Incomplete file request (hashes don't match)".into()); - } - } - - Ok(()) - } -} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..b5842b4 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,162 @@ +use std::{collections::HashMap, error::Error, net::SocketAddr, path::PathBuf, sync::Arc}; + +use tokio::{ + fs::File, + io::AsyncReadExt, + net::{TcpListener, TcpStream}, + sync::mpsc, +}; + +use crate::{crypto::Crypto, sockets::SocketHandler, util::FileInfo}; + +#[derive(Clone)] +pub struct Server { + addr: SocketAddr, + key: String, + chunksize: usize, + metadata: Vec, + index: HashMap, +} + +impl Server { + pub fn new( + addr: SocketAddr, + key: String, + chunksize: usize, + metadata: Vec, + index: HashMap, + ) -> Arc { + Arc::new(Self { + addr, + key, + chunksize, + metadata, + index, + }) + } + + pub async fn start( + self: Arc, + mut kill: mpsc::Receiver<()>, + ) -> Result<(), Box> { + tokio::select! { + _ = self.listen() => Ok(()), + _ = kill.recv() => Ok(()), + } + } + + async fn listen(self: Arc) -> Result<(), Box> { + let listener = TcpListener::bind(self.addr).await?; + + loop { + let this_self = self.clone(); + let (mut socket, addr) = listener.accept().await?; + + // log: new client connected: + + match tokio::spawn(async move { this_self.connection(&mut socket).await }).await { + Ok(_) => {} + Err(e) => eprintln!("Error during connection ({}): {}", addr, e), + }; + } + } + + async fn connection(&self, socket: &mut TcpStream) -> Result<(), Box> { + let mut handler = SocketHandler::new(socket); + let crypto = Crypto::new(&mut handler, true).await?; + handler.set_crypto(crypto); + + if !self.authorize(&mut handler).await? { + return Ok(()); + } + + self.metadata(&mut handler).await?; + self.requests(&mut handler).await?; + + Ok(()) + } + + async fn authorize( + &self, + handler: &mut SocketHandler<'_>, + ) -> Result> { + let buf = handler.recv().await?; + let key = String::from_utf8(buf)?; + + let is_valid: bool; + let res_msg: Vec; + + if key != self.key { + is_valid = false; + res_msg = b"DISCONNECT".to_vec(); + } else { + is_valid = true; + res_msg = b"VALID".to_vec(); + } + + handler.send(&res_msg).await?; + + Ok(is_valid) + } + + async fn metadata( + &self, + handler: &mut SocketHandler<'_>, + ) -> Result<(), Box> { + let amt = self.metadata.len(); + let msg = amt.to_string().as_bytes().to_vec(); + + handler.send(&msg).await?; + + let buf = handler.recv().await?; + let res_amt = String::from_utf8(buf)?.trim().parse::()?; + + if res_amt != amt { + return Err("Broken message sequence during metadata exchange".into()); + } + + for file in &self.metadata { + let msg = format!("{}:{}:{}", file.name, file.size, file.hash) + .as_bytes() + .to_vec(); + handler.send(&msg).await?; + } + + Ok(()) + } + + async fn requests( + &self, + handler: &mut SocketHandler<'_>, + ) -> Result<(), Box> { + loop { + let buf = handler.recv().await?; + let hash = String::from_utf8(buf)?; + let hash = hash.trim(); + + if hash == "DISCONNECT" { + break; + } + + let mut file = File::open(self.index[hash].clone()).await?; + let mut remaining = file.metadata().await?.len(); + let mut sendbuf = vec![0u8; self.chunksize]; + + while remaining != 0 { + let n = file.read(&mut sendbuf).await?; + handler.send(&sendbuf[..n].to_vec()).await?; + remaining -= n as u64; + } + + let buf = handler.recv().await?; + let confirmation = String::from_utf8(buf)?; + let confirmation = confirmation.trim(); + + if confirmation != hash { + return Err("Unsuccessful file transfer, hashes don't match".into()); + } + } + + Ok(()) + } +} diff --git a/src/sockets.rs b/src/sockets.rs index 0b4d3b5..f98490f 100644 --- a/src/sockets.rs +++ b/src/sockets.rs @@ -36,26 +36,26 @@ impl<'a> SocketHandler<'a> { self.crypto = Some(crypto); } - pub async fn sender(&mut self, data: &[u8]) -> Result<(), Box> { + pub async fn send(&mut self, data: &[u8]) -> Result<(), Box> { let data = match &self.crypto { Some(c) => c.encrypt(data).await?, None => data.to_vec(), }; - self.send(&data).await?; + self.send_raw(&data).await?; Ok(()) } - pub async fn send(&mut self, data: &[u8]) -> Result<(), Box> { + pub async fn send_raw(&mut self, data: &[u8]) -> Result<(), Box> { self.writer.write_all(data).await?; self.writer.flush().await?; Ok(()) } - pub async fn receiver(&mut self) -> Result, Box> { - let mut buf = self.recv().await?; + pub async fn recv(&mut self) -> Result, Box> { + let mut buf = self.recv_raw().await?; buf.pop(); buf = general_purpose::STANDARD_NO_PAD.decode(&buf)?.to_vec(); @@ -67,7 +67,7 @@ impl<'a> SocketHandler<'a> { Ok(data) } - pub async fn recv(&mut self) -> Result, Box> { + pub async fn recv_raw(&mut self) -> Result, Box> { let mut buf = Vec::new(); let n = self.reader.read_until(b':', &mut buf).await?; diff --git a/src/util.rs b/src/util.rs index 16abb5b..75c6ef0 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,4 +1,8 @@ -use std::{error::Error, fs, net::SocketAddr, path::PathBuf}; +use std::{collections::HashMap, error::Error, fs, net::SocketAddr, path::PathBuf}; + +use tokio::{fs::File, io::BufWriter}; + +use crate::crypto; const PUBLIC_IPV4: &str = "https://ipinfo.io/ip"; const PUBLIC_IPV6: &str = "https://ipv6.icanhazip.com"; @@ -15,9 +19,9 @@ impl Ip { let addr = match self { Ip::V4 => PUBLIC_IPV4, Ip::V6 => PUBLIC_IPV6, - Local => { + Ip::Local => { let addr_str = format!("127.0.0.1:{}", port); - return addr_str.parse::(); + return Ok(addr_str.parse::()?); } }; @@ -28,6 +32,19 @@ impl Ip { } } +#[derive(Clone)] +pub struct FileInfo { + pub name: String, + pub size: u64, + pub hash: String, +} + +impl FileInfo { + pub fn new(name: String, size: u64, hash: String) -> Self { + Self { name, size, hash } + } +} + fn filepaths( infile: Option, files: Option>, @@ -49,3 +66,35 @@ fn filepaths( Ok(filepaths) } + +pub async fn metadata( + files: &Vec, +) -> Result<(Vec<(String, u64, String)>, HashMap), Box> { + let mut metadata = Vec::new(); + let mut index = HashMap::new(); + + for path in files { + let split = path.to_str().unwrap().split('/').collect::>(); + let name = split[split.len() - 1].to_string(); + let handle = File::open(path).await?; + let size = handle.metadata().await?.len(); + let hash = crypto::try_hash(path)?; + + if size > 0 { + metadata.push((name, size, hash.clone())); + index.insert(hash, path.clone()); + } + } + + Ok((metadata, index)) +} + +pub async fn new_file( + mut path: PathBuf, + name: &str, +) -> Result<(BufWriter, PathBuf), Box> { + path.push(name); + let handle = File::create(&path).await?; + + Ok((BufWriter::new(handle), path)) +}