compressed & more concise structure (untested)

This commit is contained in:
17ms 2023-05-26 01:50:55 +03:00
parent 572c1e7468
commit 15579b016e
8 changed files with 343 additions and 482 deletions

117
src/client.rs Normal file
View File

@ -0,0 +1,117 @@
use std::{error::Error, net::SocketAddr, path::PathBuf};
use tokio::{io::AsyncWriteExt, net::TcpStream};
use crate::{
crypto::{self, Crypto},
sockets::SocketHandler,
util::{new_file, FileInfo},
};
#[derive(Clone)]
pub struct Client {
addr: SocketAddr,
key: String,
output: PathBuf,
}
impl Client {
pub fn new(addr: SocketAddr, key: String, output: PathBuf) -> Self {
Self { addr, key, output }
}
pub async fn connection(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut socket = TcpStream::connect(self.addr).await?;
let mut handler = SocketHandler::new(&mut socket);
let crypto = Crypto::new(&mut handler, true).await?;
handler.set_crypto(crypto);
if !self.authorize(&mut handler).await? {
// log: invalid access key '<self.key>'
return Ok(());
}
let metadata = self.metadata(&mut handler).await?;
self.requests(&mut handler, metadata).await?;
Ok(())
}
async fn authorize(
&self,
handler: &mut SocketHandler<'_>,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
let msg = self.key.as_bytes().to_vec();
handler.send(&msg).await?;
let buf = handler.recv().await?;
let msg = String::from_utf8(buf)?;
let msg = msg.trim();
if msg == "DISCONNECT" {
return Ok(false);
}
Ok(true)
}
async fn metadata(
&self,
handler: &mut SocketHandler<'_>,
) -> Result<Vec<FileInfo>, Box<dyn Error + Send + Sync>> {
let buf = handler.recv().await?;
let amt = String::from_utf8(buf.clone())?.parse::<usize>()?;
handler.send(&buf).await?; // confirmation
let mut metadata = Vec::new();
while metadata.len() < amt {
let buf = handler.recv().await?;
let data = String::from_utf8(buf)?;
let split = data.split(':').collect::<Vec<&str>>();
let name = split[0].trim().to_string();
let size = split[1].trim().parse::<u64>()?;
let hash = split[2].trim().to_string();
let info = FileInfo::new(name, size, hash);
metadata.push(info);
}
Ok(metadata)
}
async fn requests(
&self,
handler: &mut SocketHandler<'_>,
metadata: Vec<FileInfo>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
for file in metadata {
let (mut handle, path) = new_file(self.output.clone(), &file.name).await?;
let msg = file.hash.as_bytes().to_vec();
handler.send(&msg).await?;
// log: downloading file to <path>
let mut remaining = file.size;
while remaining != 0 {
let buf = handler.recv().await?;
handle.write_all(&buf).await?;
handle.flush().await?;
remaining -= buf.len() as u64;
}
let check_hash = crypto::try_hash(&path)?;
let msg = check_hash.as_bytes().to_vec();
handler.send(&msg).await?;
if check_hash != file.hash {
return Err("Unsuccessful file transfer, hashes don't match".into());
} // else: log that the transfer was successful
}
Ok(())
}
}

View File

@ -1,231 +0,0 @@
use std::{collections::HashMap, error::Error, net::SocketAddr, path::PathBuf};
use tokio::{
fs::File,
io::{AsyncWriteExt, BufWriter},
net::TcpStream,
sync::mpsc,
};
use crate::{
common::{Connection, Message},
comms, crypto,
};
#[derive(Debug)]
pub struct Request {
pub name: String,
pub size: u64,
pub hash: String,
}
impl Request {
pub fn new(name: String, metadata: &HashMap<String, (u64, String)>) -> Option<Self> {
let (size, hash) = metadata.get(&name)?.clone();
Some(Self { name, size, hash })
}
}
#[derive(Debug, Clone)]
pub struct Connector {
target_addr: SocketAddr,
access_key: String,
output_path: PathBuf,
}
impl Connector {
pub fn new(target_addr: SocketAddr, access_key: String, output_path: PathBuf) -> Self {
Self {
target_addr,
access_key,
output_path,
}
}
pub async fn connect(
self,
tx: mpsc::Sender<Message>,
mut rx: mpsc::Receiver<Message>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut socket = TcpStream::connect(self.target_addr).await?;
let mut connection = Connection::new(&mut socket).await?;
self.authorize(&mut connection).await?;
let metadata = self.metadata(&mut connection).await?;
tx.send(Message::Metadata(metadata.clone())).await?;
self.request_handler(&mut connection, &mut rx, &metadata)
.await?;
let msg = b"FIN".to_vec();
comms::send(
&mut connection.writer,
Some(&mut connection.cipher),
Some(&mut connection.rng),
&msg,
)
.await?;
Ok(())
}
async fn authorize(
&self,
conn: &mut Connection<'_>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let msg = self.access_key.to_string().as_bytes().to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let msg = String::from_utf8(buf)?;
if msg == "FIN" {
return Err("Incorrect access key".into());
}
Ok(())
}
async fn metadata(
&self,
conn: &mut Connection<'_>,
) -> Result<HashMap<String, (u64, String)>, Box<dyn Error + Send + Sync>> {
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let amt: usize = String::from_utf8(buf)?.parse()?;
let msg = b"AMT".to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
let mut metadata = HashMap::new();
while metadata.len() < amt {
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let msg = String::from_utf8(buf)?;
let split: Vec<&str> = msg.split(':').collect();
let name = split[0].trim().to_string();
let size: u64 = split[1].trim().parse()?;
let hash = split[2].trim().to_string();
metadata.insert(name, (size, hash));
}
Ok(metadata)
}
async fn new_handle(
&self,
filename: &str,
) -> Result<(BufWriter<File>, PathBuf), Box<dyn Error + Send + Sync>> {
let mut path = self.output_path.clone();
path.push(filename);
let filehandle = File::create(&path).await?;
Ok((BufWriter::new(filehandle), path))
}
async fn request(
&self,
conn: &mut Connection<'_>,
req: Request,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
let (mut handle, path) = self.new_handle(&req.name).await?;
let msg = req.hash.as_bytes().to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
let mut remaining = req.size;
while remaining != 0 {
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
handle.write_all(&buf).await?;
handle.flush().await?;
remaining -= buf.len() as u64;
}
let new_hash = crypto::try_hash(&path)?;
let msg: Vec<u8> = if new_hash == req.hash {
b"OK".to_vec()
} else {
b"ERROR".to_vec()
};
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
Ok(true)
}
async fn request_handler(
&self,
conn: &mut Connection<'_>,
rx: &mut mpsc::Receiver<Message>,
metadata: &HashMap<String, (u64, String)>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
loop {
let msg = rx.recv().await;
if msg.is_none() {
continue;
}
match self.msg_handler(msg.unwrap(), conn, metadata).await {
Ok(true) => continue,
Ok(false) => break,
Err(e) => return Err(e),
}
}
Ok(())
}
async fn msg_handler(
&self,
msg: Message,
conn: &mut Connection<'_>,
metadata: &HashMap<String, (u64, String)>,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
match msg {
Message::ClientReq(name) => {
let req = Request::new(name, metadata).unwrap();
self.request(conn, req).await?;
Ok(true)
}
Message::Shutdown => {
let msg = b"DISCONNECT".to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
Ok(false)
}
_ => Ok(true),
}
}
}

View File

@ -41,11 +41,11 @@ impl Crypto {
let msg = own_pbk.as_bytes().to_vec(); let msg = own_pbk.as_bytes().to_vec();
if go_first { if go_first {
handler.send(&msg).await?; handler.send_raw(&msg).await?;
buf = handler.recv().await?; buf = handler.recv_raw().await?;
} else { } else {
buf = handler.recv().await?; buf = handler.recv_raw().await?;
handler.send(&msg).await?; handler.send_raw(&msg).await?;
} }
let slice: [u8; DH_PBK_SIZE] = buf[..DH_PBK_SIZE].try_into()?; let slice: [u8; DH_PBK_SIZE] = buf[..DH_PBK_SIZE].try_into()?;

View File

@ -1,7 +1,7 @@
pub mod cli; pub mod cli;
pub mod connector; pub mod client;
pub mod crypto; pub mod crypto;
pub mod listener;
pub mod parser; pub mod parser;
pub mod server;
pub mod sockets; pub mod sockets;
pub mod util; pub mod util;

View File

@ -1,236 +0,0 @@
use std::{collections::HashMap, error::Error, net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::{
fs::File,
io::AsyncReadExt,
net::{TcpListener, TcpStream},
sync::mpsc,
};
use crate::{
common::{Connection, Message},
comms, crypto,
};
#[derive(Debug, Clone)]
pub struct Listener {
host_addr: SocketAddr,
access_key: String,
chunksize: usize,
}
// TODO: impl Drop (?)
impl Listener {
pub fn new(
host_addr: SocketAddr,
access_key: String,
chunksize: usize,
) -> Result<Arc<Self>, Box<dyn Error>> {
Ok(Arc::new(Self {
host_addr,
access_key,
chunksize,
}))
}
pub async fn start(
self: Arc<Self>,
tx: mpsc::Sender<Message>,
mut kill: mpsc::Receiver<Message>,
files: Vec<PathBuf>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
tokio::select! {
_ = self.listen(tx, files) => Ok(()),
_ = kill.recv() => Ok(()),
}
}
async fn listen(
self: Arc<Self>,
tx: mpsc::Sender<Message>,
files: Vec<PathBuf>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let listener = TcpListener::bind(self.host_addr).await?;
loop {
let files = files.clone();
let (mut socket, addr) = listener.accept().await?;
tx.send(Message::ClientConnect(addr)).await?;
let conn_tx = tx.clone();
let err_tx = tx.clone();
let conn_self = Arc::clone(&self);
match tokio::spawn(async move {
conn_self
.connection(&mut socket, addr, conn_tx, &files)
.await
})
.await
{
Ok(_) => {}
Err(e) => {
let err_msg = format!("{}: {}", addr, e);
err_tx.send(Message::Error(err_msg)).await?;
}
};
}
}
async fn connection(
&self,
socket: &mut TcpStream,
addr: SocketAddr,
tx: mpsc::Sender<Message>,
files: &Vec<PathBuf>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut connection = Connection::new(socket).await?;
if !self.authorize(&mut connection).await? {
return Ok::<(), Box<dyn Error + Send + Sync>>(());
}
let index = self.metadata_handler(&mut connection, files).await?;
tx.send(Message::ConnectionReady).await?;
self.request_handler(&mut connection, &index).await?;
tx.send(Message::ClientDisconnect(addr)).await?;
Ok::<(), Box<dyn Error + Send + Sync>>(())
}
async fn authorize(
&self,
conn: &mut Connection<'_>,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let key = String::from_utf8(buf)?;
let msg: Vec<u8>;
let res: bool;
if key != self.access_key {
res = false;
msg = b"DISCONNECT".to_vec();
} else {
res = true;
msg = b"OK".to_vec();
}
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
Ok(res)
}
async fn metadata(
&self,
files: &Vec<PathBuf>,
) -> Result<
(usize, Vec<(String, u64, String)>, HashMap<String, PathBuf>),
Box<dyn Error + Send + Sync>,
> {
let mut metadata: Vec<(String, u64, String)> = Vec::new();
let mut index = HashMap::new();
for path in files {
let split: Vec<&str> = path.to_str().unwrap().split('/').collect();
let name = split[split.len() - 1].to_string();
let handle = File::open(path).await?;
let size = handle.metadata().await?.len();
let hash = crypto::try_hash(path)?;
if size > 0 {
metadata.push((name, size, hash.clone()));
index.insert(hash, path.clone());
}
}
Ok((metadata.len(), metadata, index))
}
async fn metadata_handler(
&self,
conn: &mut Connection<'_>,
files: &Vec<PathBuf>,
) -> Result<HashMap<String, PathBuf>, Box<dyn Error + Send + Sync>> {
let (amt, metadata, index) = self.metadata(files).await?;
let msg = amt.to_string().as_bytes().to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let msg = String::from_utf8(buf)?;
if msg != "AMT" {
return Err("Broken message sequence".into());
}
for file in metadata {
let msg = format!("{}:{}:{}", file.0, file.1, file.2)
.as_bytes()
.to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
}
Ok(index)
}
async fn request_handler(
&self,
conn: &mut Connection<'_>,
index: &HashMap<String, PathBuf>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
loop {
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let cmd = String::from_utf8(buf)?;
if cmd == "DISCONNECT" {
break;
}
let mut file = File::open(index[&cmd].clone()).await?;
let mut remaining = file.metadata().await?.len();
let mut send_buf = vec![0u8; self.chunksize];
while remaining != 0 {
let n = file.read(&mut send_buf).await?;
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&send_buf[..n].to_vec(),
)
.await?;
remaining -= n as u64;
}
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let msg = String::from_utf8(buf)?;
if msg == "ERROR" {
return Err("Incomplete file request (hashes don't match)".into());
}
}
Ok(())
}
}

162
src/server.rs Normal file
View File

@ -0,0 +1,162 @@
use std::{collections::HashMap, error::Error, net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::{
fs::File,
io::AsyncReadExt,
net::{TcpListener, TcpStream},
sync::mpsc,
};
use crate::{crypto::Crypto, sockets::SocketHandler, util::FileInfo};
#[derive(Clone)]
pub struct Server {
addr: SocketAddr,
key: String,
chunksize: usize,
metadata: Vec<FileInfo>,
index: HashMap<String, PathBuf>,
}
impl Server {
pub fn new(
addr: SocketAddr,
key: String,
chunksize: usize,
metadata: Vec<FileInfo>,
index: HashMap<String, PathBuf>,
) -> Arc<Self> {
Arc::new(Self {
addr,
key,
chunksize,
metadata,
index,
})
}
pub async fn start(
self: Arc<Self>,
mut kill: mpsc::Receiver<()>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
tokio::select! {
_ = self.listen() => Ok(()),
_ = kill.recv() => Ok(()),
}
}
async fn listen(self: Arc<Self>) -> Result<(), Box<dyn Error + Send + Sync>> {
let listener = TcpListener::bind(self.addr).await?;
loop {
let this_self = self.clone();
let (mut socket, addr) = listener.accept().await?;
// log: new client connected: <addr>
match tokio::spawn(async move { this_self.connection(&mut socket).await }).await {
Ok(_) => {}
Err(e) => eprintln!("Error during connection ({}): {}", addr, e),
};
}
}
async fn connection(&self, socket: &mut TcpStream) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut handler = SocketHandler::new(socket);
let crypto = Crypto::new(&mut handler, true).await?;
handler.set_crypto(crypto);
if !self.authorize(&mut handler).await? {
return Ok(());
}
self.metadata(&mut handler).await?;
self.requests(&mut handler).await?;
Ok(())
}
async fn authorize(
&self,
handler: &mut SocketHandler<'_>,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
let buf = handler.recv().await?;
let key = String::from_utf8(buf)?;
let is_valid: bool;
let res_msg: Vec<u8>;
if key != self.key {
is_valid = false;
res_msg = b"DISCONNECT".to_vec();
} else {
is_valid = true;
res_msg = b"VALID".to_vec();
}
handler.send(&res_msg).await?;
Ok(is_valid)
}
async fn metadata(
&self,
handler: &mut SocketHandler<'_>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let amt = self.metadata.len();
let msg = amt.to_string().as_bytes().to_vec();
handler.send(&msg).await?;
let buf = handler.recv().await?;
let res_amt = String::from_utf8(buf)?.trim().parse::<usize>()?;
if res_amt != amt {
return Err("Broken message sequence during metadata exchange".into());
}
for file in &self.metadata {
let msg = format!("{}:{}:{}", file.name, file.size, file.hash)
.as_bytes()
.to_vec();
handler.send(&msg).await?;
}
Ok(())
}
async fn requests(
&self,
handler: &mut SocketHandler<'_>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
loop {
let buf = handler.recv().await?;
let hash = String::from_utf8(buf)?;
let hash = hash.trim();
if hash == "DISCONNECT" {
break;
}
let mut file = File::open(self.index[hash].clone()).await?;
let mut remaining = file.metadata().await?.len();
let mut sendbuf = vec![0u8; self.chunksize];
while remaining != 0 {
let n = file.read(&mut sendbuf).await?;
handler.send(&sendbuf[..n].to_vec()).await?;
remaining -= n as u64;
}
let buf = handler.recv().await?;
let confirmation = String::from_utf8(buf)?;
let confirmation = confirmation.trim();
if confirmation != hash {
return Err("Unsuccessful file transfer, hashes don't match".into());
}
}
Ok(())
}
}

View File

@ -36,26 +36,26 @@ impl<'a> SocketHandler<'a> {
self.crypto = Some(crypto); self.crypto = Some(crypto);
} }
pub async fn sender(&mut self, data: &[u8]) -> Result<(), Box<dyn Error + Send + Sync>> { pub async fn send(&mut self, data: &[u8]) -> Result<(), Box<dyn Error + Send + Sync>> {
let data = match &self.crypto { let data = match &self.crypto {
Some(c) => c.encrypt(data).await?, Some(c) => c.encrypt(data).await?,
None => data.to_vec(), None => data.to_vec(),
}; };
self.send(&data).await?; self.send_raw(&data).await?;
Ok(()) Ok(())
} }
pub async fn send(&mut self, data: &[u8]) -> Result<(), Box<dyn Error + Send + Sync>> { pub async fn send_raw(&mut self, data: &[u8]) -> Result<(), Box<dyn Error + Send + Sync>> {
self.writer.write_all(data).await?; self.writer.write_all(data).await?;
self.writer.flush().await?; self.writer.flush().await?;
Ok(()) Ok(())
} }
pub async fn receiver(&mut self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> { pub async fn recv(&mut self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let mut buf = self.recv().await?; let mut buf = self.recv_raw().await?;
buf.pop(); buf.pop();
buf = general_purpose::STANDARD_NO_PAD.decode(&buf)?.to_vec(); buf = general_purpose::STANDARD_NO_PAD.decode(&buf)?.to_vec();
@ -67,7 +67,7 @@ impl<'a> SocketHandler<'a> {
Ok(data) Ok(data)
} }
pub async fn recv(&mut self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> { pub async fn recv_raw(&mut self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let mut buf = Vec::new(); let mut buf = Vec::new();
let n = self.reader.read_until(b':', &mut buf).await?; let n = self.reader.read_until(b':', &mut buf).await?;

View File

@ -1,4 +1,8 @@
use std::{error::Error, fs, net::SocketAddr, path::PathBuf}; use std::{collections::HashMap, error::Error, fs, net::SocketAddr, path::PathBuf};
use tokio::{fs::File, io::BufWriter};
use crate::crypto;
const PUBLIC_IPV4: &str = "https://ipinfo.io/ip"; const PUBLIC_IPV4: &str = "https://ipinfo.io/ip";
const PUBLIC_IPV6: &str = "https://ipv6.icanhazip.com"; const PUBLIC_IPV6: &str = "https://ipv6.icanhazip.com";
@ -15,9 +19,9 @@ impl Ip {
let addr = match self { let addr = match self {
Ip::V4 => PUBLIC_IPV4, Ip::V4 => PUBLIC_IPV4,
Ip::V6 => PUBLIC_IPV6, Ip::V6 => PUBLIC_IPV6,
Local => { Ip::Local => {
let addr_str = format!("127.0.0.1:{}", port); let addr_str = format!("127.0.0.1:{}", port);
return addr_str.parse::<SocketAddr>(); return Ok(addr_str.parse::<SocketAddr>()?);
} }
}; };
@ -28,6 +32,19 @@ impl Ip {
} }
} }
#[derive(Clone)]
pub struct FileInfo {
pub name: String,
pub size: u64,
pub hash: String,
}
impl FileInfo {
pub fn new(name: String, size: u64, hash: String) -> Self {
Self { name, size, hash }
}
}
fn filepaths( fn filepaths(
infile: Option<PathBuf>, infile: Option<PathBuf>,
files: Option<Vec<PathBuf>>, files: Option<Vec<PathBuf>>,
@ -49,3 +66,35 @@ fn filepaths(
Ok(filepaths) Ok(filepaths)
} }
pub async fn metadata(
files: &Vec<PathBuf>,
) -> Result<(Vec<(String, u64, String)>, HashMap<String, PathBuf>), Box<dyn Error + Send + Sync>> {
let mut metadata = Vec::new();
let mut index = HashMap::new();
for path in files {
let split = path.to_str().unwrap().split('/').collect::<Vec<&str>>();
let name = split[split.len() - 1].to_string();
let handle = File::open(path).await?;
let size = handle.metadata().await?.len();
let hash = crypto::try_hash(path)?;
if size > 0 {
metadata.push((name, size, hash.clone()));
index.insert(hash, path.clone());
}
}
Ok((metadata, index))
}
pub async fn new_file(
mut path: PathBuf,
name: &str,
) -> Result<(BufWriter<File>, PathBuf), Box<dyn Error + Send + Sync>> {
path.push(name);
let handle = File::create(&path).await?;
Ok((BufWriter::new(handle), path))
}