Skip to content

Commit

Permalink
More More P2P Docs (#2525)
Browse files Browse the repository at this point in the history
* Docs

* Clarify relay upgrades

* caaaaalapse

* Cleanup `sd_p2p_tunnel`
  • Loading branch information
oscartbeaumont authored May 31, 2024
1 parent 735e80a commit 58dd5c5
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 199 deletions.
1 change: 0 additions & 1 deletion core/src/custom_uri/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ pub fn base_router() -> Router<LocalState> {
request_file(
state.node.p2p.p2p.clone(),
node_identity,
&library.id,
&library.identity,
file_path_pub_id,
Range::Full,
Expand Down
34 changes: 34 additions & 0 deletions core/src/library/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
Node,
};

use futures::future::join_all;
use sd_core_sync::SyncMessage;
use sd_p2p::{Identity, RemoteIdentity};
use sd_prisma::prisma::{crdt_operation, instance, location, SortOrder};
Expand Down Expand Up @@ -382,6 +383,39 @@ impl Libraries {
self.libraries.read().await.get(library_id).cloned()
}

// will return the library context for the given instance
pub async fn get_library_for_instance(
&self,
instance: &RemoteIdentity,
) -> Option<Arc<Library>> {
join_all(
self.libraries
.read()
.await
.iter()
.map(|(_, library)| async move {
library
.db
.instance()
.find_many(vec![instance::remote_identity::equals(
instance.get_bytes().to_vec(),
)])
.exec()
.await
.ok()
.iter()
.flatten()
.filter_map(|i| RemoteIdentity::from_bytes(&i.remote_identity).ok())
.into_iter()
.any(|i| i == *instance)
.then(|| Arc::clone(library))
}),
)
.await
.into_iter()
.find_map(|v| v)
}

// get_ctx will return the library context for the given library id.
pub async fn hash_library(&self, library_id: &Uuid) -> bool {
self.libraries.read().await.get(library_id).is_some()
Expand Down
17 changes: 8 additions & 9 deletions core/src/p2p/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,23 +393,22 @@ async fn start(
}) else {
return;
};
let library_id = tunnel.library_id();

let Ok(msg) = SyncMessage::from_stream(&mut tunnel).await.map_err(|err| {
error!("Failed `SyncMessage::from_stream`: {}", err);
}) else {
return;
};

let Ok(library) =
node.libraries
.get_library(&library_id)
.await
.ok_or_else(|| {
error!("Failed to get library '{library_id}'");
let Ok(library) = node
.libraries
.get_library_for_instance(&tunnel.library_remote_identity())
.await
.ok_or_else(|| {
error!("Failed to get library {}", tunnel.library_remote_identity());

// TODO: Respond to remote client with warning!
})
// TODO: Respond to remote client with warning!
})
else {
return;
};
Expand Down
14 changes: 4 additions & 10 deletions core/src/p2p/operations/library.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{p2p::Header, Node};
pub async fn request_file(
p2p: Arc<P2P>,
identity: RemoteIdentity,
library_id: &Uuid,
library_identity: &Identity,
file_path_id: Uuid,
range: Range,
Expand All @@ -42,7 +41,7 @@ pub async fn request_file(
)
.await?;

let mut stream = sd_p2p_tunnel::Tunnel::initiator(stream, library_id, library_identity).await?;
let mut stream = sd_p2p_tunnel::Tunnel::initiator(stream, library_identity).await?;

let block_size = BlockSize::from_stream(&mut stream).await?;
let size = stream.read_u64_le().await?;
Expand Down Expand Up @@ -82,9 +81,9 @@ pub(crate) async fn receiver(

let library = node
.libraries
.get_library(&stream.library_id())
.get_library_for_instance(&stream.library_remote_identity())
.await
.ok_or_else(|| format!("Library not found: {:?}", stream.library_id()))?;
.ok_or_else(|| format!("Library not found: {:?}", stream.library_remote_identity()))?;

let file_path = library
.db
Expand All @@ -93,12 +92,7 @@ pub(crate) async fn receiver(
.select(file_path_to_handle_p2p_serve_file::select())
.exec()
.await?
.ok_or_else(|| {
format!(
"File path {file_path_id:?} not found in {:?}",
stream.library_id()
)
})?;
.ok_or_else(|| format!("File path {file_path_id:?} not found in {:?}", library.id))?;

let location = file_path.location.as_ref().expect("included in query");
let location_path = location.path.as_ref().expect("included in query");
Expand Down
4 changes: 1 addition & 3 deletions core/src/p2p/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ mod originator {

stream.write_all(&Header::Sync.to_bytes()).await.unwrap();

let mut tunnel = Tunnel::initiator(stream, &library.id, &library.identity)
.await
.unwrap();
let mut tunnel = Tunnel::initiator(stream, &library.identity).await.unwrap();

tunnel
.write_all(&SyncMessage::NewOperations.to_bytes())
Expand Down
143 changes: 140 additions & 3 deletions crates/p2p/crates/tunnel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,143 @@
//! A system for creating encrypted tunnels between peers over untrusted connections.
mod tunnel;
use std::{
io,
pin::Pin,
task::{Context, Poll},
};

pub use sd_p2p::{Identity, IdentityErr, RemoteIdentity};
pub use tunnel::*;
use sd_p2p_proto::{decode, encode};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};

use thiserror::Error;

use sd_p2p::{Identity, IdentityErr, RemoteIdentity, UnicastStream};

#[derive(Debug, Error)]
pub enum TunnelError {
#[error("Error writing discriminator.")]
DiscriminatorWriteError,
#[error("Error reading discriminator. Is this stream actually a tunnel?")]
DiscriminatorReadError,
#[error("Invalid discriminator. Is this stream actually a tunnel?")]
InvalidDiscriminator,
#[error("Error sending library id: {0:?}")]
ErrorSendingLibraryId(io::Error),
#[error("Error receiving library identity: {0:?}")]
ErrorReceivingLibraryIdentity(decode::Error),
#[error("Error decoding library identity: {0:?}")]
ErrorDecodingLibraryIdentity(IdentityErr),
}

/// An encrypted tunnel between two libraries.
///
/// This sits on top of the existing node to node encryption provided by Quic.
///
/// It's primarily designed to avoid an attack where traffic flows:
/// node <-> attacker node <-> node
/// The attackers node can't break TLS but if they get in the middle they can present their own node identity to each side and then intercept library related traffic.
/// To avoid that we use this tunnel to encrypt all library related traffic so it can only be decoded by another instance of the same library.
#[derive(Debug)]
pub struct Tunnel {
stream: UnicastStream,
library_remote_id: RemoteIdentity,
}

impl Tunnel {
/// Create a new tunnel.
///
/// This should be used by the node that initiated the request which this tunnel is used for.
pub async fn initiator(
mut stream: UnicastStream,
library_identity: &Identity,
) -> Result<Self, TunnelError> {
stream
.write_all(&[b'T'])
.await
.map_err(|_| TunnelError::DiscriminatorWriteError)?;

let mut buf = vec![];
encode::buf(&mut buf, &library_identity.to_remote_identity().get_bytes());
stream
.write_all(&buf)
.await
.map_err(TunnelError::ErrorSendingLibraryId)?;

// TODO: Do encryption things

Ok(Self {
stream,
library_remote_id: library_identity.to_remote_identity(),
})
}

/// Create a new tunnel.
///
/// This should be used by the node that responded to the request which this tunnel is used for.
pub async fn responder(mut stream: UnicastStream) -> Result<Self, TunnelError> {
let discriminator = stream
.read_u8()
.await
.map_err(|_| TunnelError::DiscriminatorReadError)?;
if discriminator != b'T' {
return Err(TunnelError::InvalidDiscriminator);
}

// TODO: Blindly decoding this from the stream is not secure. We need a cryptographic handshake here to prove the peer on the other ends is holding the private key.
let library_remote_id = decode::buf(&mut stream)
.await
.map_err(TunnelError::ErrorReceivingLibraryIdentity)?;

let library_remote_id = RemoteIdentity::from_bytes(&library_remote_id)
.map_err(TunnelError::ErrorDecodingLibraryIdentity)?;

// TODO: Do encryption things

Ok(Self {
library_remote_id,
stream,
})
}

/// Get the `RemoteIdentity` of the peer on the other end of the tunnel.
pub fn node_remote_identity(&self) -> RemoteIdentity {
self.stream.remote_identity()
}

/// Get the `RemoteIdentity` of the library instance on the other end of the tunnel.
pub fn library_remote_identity(&self) -> RemoteIdentity {
self.library_remote_id
}
}

impl AsyncRead for Tunnel {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// TODO: Do decryption

Pin::new(&mut self.get_mut().stream).poll_read(cx, buf)
}
}

impl AsyncWrite for Tunnel {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
// TODO: Do encryption

Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().stream).poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().stream).poll_shutdown(cx)
}
}
Loading

0 comments on commit 58dd5c5

Please sign in to comment.