From 24bb4246d303cfd5a76af268d4797d38ef0338ef Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Thu, 11 Jun 2026 16:07:23 +0200 Subject: [PATCH 01/12] feat: add header integrity validation and replay protection to control messages Signed-off-by: Mark Marton --- data-plane/Cargo.lock | 1 + data-plane/core/auth/src/utils.rs | 33 +++++ .../core/datapath/benches/name_benchmark.rs | 2 + .../src/api/gen/dataplane.proto.v1.rs | 6 + data-plane/core/datapath/src/header_mac.rs | 2 + data-plane/core/datapath/src/link_ecdh.rs | 2 + .../core/datapath/src/messages/utils.rs | 4 + data-plane/core/session/Cargo.toml | 1 + data-plane/core/session/src/errors.rs | 10 +- data-plane/core/session/src/mls_state.rs | 54 +++---- .../core/session/src/session_builder.rs | 1 + .../core/session/src/session_controller.rs | 134 +++++++++++++++--- data-plane/core/session/src/session_layer.rs | 5 +- .../core/session/src/session_moderator.rs | 24 +++- .../core/session/src/session_participant.rs | 22 ++- .../core/session/src/session_settings.rs | 28 ++++ proto/data-plane/v1/data_plane.proto | 3 + 17 files changed, 283 insertions(+), 49 deletions(-) diff --git a/data-plane/Cargo.lock b/data-plane/Cargo.lock index fbfda73cb..9350acef7 100644 --- a/data-plane/Cargo.lock +++ b/data-plane/Cargo.lock @@ -368,6 +368,7 @@ dependencies = [ "agntcy-slim-testing", "agntcy-slim-version", "async-trait", + "base64 0.22.1", "criterion", "display-error-chain", "futures", diff --git a/data-plane/core/auth/src/utils.rs b/data-plane/core/auth/src/utils.rs index e4c529ffc..bb23e75cf 100644 --- a/data-plane/core/auth/src/utils.rs +++ b/data-plane/core/auth/src/utils.rs @@ -31,6 +31,31 @@ pub fn generate_mls_signature_keys() -> Result<(Vec, Vec), crate::errors )) } +/// Sign the header AAD bytes using an Ed25519 private key. +pub fn sign_header_aad(aad_bytes: &[u8], private_key_bytes: &[u8]) -> Result, crate::errors::AuthError> { + use aws_lc_rs::signature::Ed25519KeyPair; + let key_pair = Ed25519KeyPair::from_seed_and_public_key( + &private_key_bytes[..32], + &private_key_bytes[32..], + ) + .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; + let signature = key_pair.sign(aad_bytes); + Ok(signature.as_ref().to_vec()) +} + +/// Verify the header AAD signature using an Ed25519 public key. +pub fn verify_header_aad( + aad_bytes: &[u8], + signature_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result<(), crate::errors::AuthError> { + use aws_lc_rs::signature::{UnparsedPublicKey, ED25519}; + let public_key = UnparsedPublicKey::new(&ED25519, public_key_bytes); + public_key + .verify(aad_bytes, signature_bytes) + .map_err(|_| crate::errors::AuthError::TokenInvalid) +} + /// Convert arbitrary bytes into a PEM-formatted string with the provided header/footer. /// The body is wrapped at 64 character lines per RFC 7468 guidance. /// Header/footer should include the BEGIN/END lines with trailing/leading newlines as desired. @@ -58,6 +83,14 @@ pub fn bytes_to_pem(key_bytes: &[u8], header: &str, footer: &str) -> String { mod tests { use super::*; + #[test] + fn test_sign_header_aad_with_generated_keys() { + let (secret, public) = generate_mls_signature_keys().unwrap(); + let msg = b"hello"; + let sig = sign_header_aad(msg, &secret).unwrap(); + verify_header_aad(msg, &sig, &public).unwrap(); + } + #[test] fn test_bytes_to_pem_basic() { let data = b"hello world"; // base64: aGVsbG8gd29ybGQ= diff --git a/data-plane/core/datapath/benches/name_benchmark.rs b/data-plane/core/datapath/benches/name_benchmark.rs index 8209aa82d..1eab6f737 100644 --- a/data-plane/core/datapath/benches/name_benchmark.rs +++ b/data-plane/core/datapath/benches/name_benchmark.rs @@ -38,6 +38,8 @@ fn make_slim_header() -> SlimHeader { incoming_conn: None, error: None, header_mac: None, + e2e_header_sig: None, + sequence_number: None, } } diff --git a/data-plane/core/datapath/src/api/gen/dataplane.proto.v1.rs b/data-plane/core/datapath/src/api/gen/dataplane.proto.v1.rs index 2d8f6fa01..2d6c8a0dc 100644 --- a/data-plane/core/datapath/src/api/gen/dataplane.proto.v1.rs +++ b/data-plane/core/datapath/src/api/gen/dataplane.proto.v1.rs @@ -94,6 +94,10 @@ pub struct SlimHeader { pub error: ::core::option::Option, #[prost(bytes = "vec", optional, tag = "10")] pub header_mac: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(uint64, optional, tag = "11")] + pub sequence_number: ::core::option::Option, + #[prost(bytes = "vec", optional, tag = "12")] + pub e2e_header_sig: ::core::option::Option<::prost::alloc::vec::Vec>, } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct SessionHeader { @@ -309,6 +313,8 @@ pub struct HeaderIntegrityAad { pub message_id: u32, #[prost(string, tag = "9")] pub payload_type: ::prost::alloc::string::String, + #[prost(uint64, optional, tag = "10")] + pub sequence_number: ::core::option::Option, } /// Group Add Payload /// sent when a new participant is added diff --git a/data-plane/core/datapath/src/header_mac.rs b/data-plane/core/datapath/src/header_mac.rs index 8c8fdaf1d..ca475a9da 100644 --- a/data-plane/core/datapath/src/header_mac.rs +++ b/data-plane/core/datapath/src/header_mac.rs @@ -315,6 +315,8 @@ mod tests { incoming_conn: Some(999), error: Some(false), header_mac: None, + sequence_number: None, + e2e_header_sig: None, } } diff --git a/data-plane/core/datapath/src/link_ecdh.rs b/data-plane/core/datapath/src/link_ecdh.rs index 36f8382fd..7782ab35c 100644 --- a/data-plane/core/datapath/src/link_ecdh.rs +++ b/data-plane/core/datapath/src/link_ecdh.rs @@ -79,6 +79,8 @@ mod tests { incoming_conn: None, error: None, header_mac: None, + sequence_number: None, + e2e_header_sig: None, }; a.sign_slim_header(&mut h, &lid).unwrap(); b.verify_slim_header(&h, &lid).unwrap(); diff --git a/data-plane/core/datapath/src/messages/utils.rs b/data-plane/core/datapath/src/messages/utils.rs index 2198ac813..ecd5b8c9d 100644 --- a/data-plane/core/datapath/src/messages/utils.rs +++ b/data-plane/core/datapath/src/messages/utils.rs @@ -277,6 +277,8 @@ impl SlimHeader { incoming_conn: flags.incoming_conn, error: flags.error, header_mac: None, + sequence_number: None, + e2e_header_sig: None, } } @@ -2011,6 +2013,8 @@ mod tests { incoming_conn: None, error: None, header_mac: None, + sequence_number: None, + e2e_header_sig: None, }; // the operations to retrieve source and destination should fail with panic diff --git a/data-plane/core/session/Cargo.toml b/data-plane/core/session/Cargo.toml index 206d6c09d..f6cecec2c 100644 --- a/data-plane/core/session/Cargo.toml +++ b/data-plane/core/session/Cargo.toml @@ -13,6 +13,7 @@ agntcy-slim-auth = { workspace = true } agntcy-slim-datapath = { workspace = true } agntcy-slim-mls = { workspace = true } agntcy-slim-version = { workspace = true } +base64 = { workspace = true } async-trait = { workspace = true } display-error-chain = { workspace = true } futures = { workspace = true } diff --git a/data-plane/core/session/src/errors.rs b/data-plane/core/session/src/errors.rs index 2ea54359e..33c165843 100644 --- a/data-plane/core/session/src/errors.rs +++ b/data-plane/core/session/src/errors.rs @@ -79,7 +79,7 @@ pub enum SessionError { MlsOp(#[from] MlsError), // Authorization and roles - #[error("auth error")] + #[error("auth error: {0}")] Auth(#[from] AuthError), // Acknowledgements and routing @@ -162,13 +162,13 @@ pub enum SessionError { ModeratorTaskUnsupportedPhase, #[error("unexpected timer id: {0}")] ModeratorTaskUnexpectedTimerId(u32), - #[error("failed to add participant to session")] + #[error("failed to add participant to session: {source}")] ModeratorTaskAddFailed { source: Box }, - #[error("failed to remove participant from session")] + #[error("failed to remove participant from session: {source}")] ModeratorTaskRemoveFailed { source: Box }, - #[error("failed to update session")] + #[error("failed to update session: {source}")] ModeratorTaskUpdateFailed { source: Box }, - #[error("failed to close session")] + #[error("failed to close session: {source}")] ModeratorTaskCloseFailed { source: Box }, } diff --git a/data-plane/core/session/src/mls_state.rs b/data-plane/core/session/src/mls_state.rs index c0513db6c..7a70c0406 100644 --- a/data-plane/core/session/src/mls_state.rs +++ b/data-plane/core/session/src/mls_state.rs @@ -366,33 +366,39 @@ where /// Builds the Authenticated Data (AAD) for header integrity checks fn build_aad(&self, msg: &Message) -> Vec { - let slim_header = msg.get_slim_header(); - let session_header = msg.get_session_header(); + build_aad(msg) + } +} - let payload_type = if let Some(payload) = msg.get_payload() { - if let Ok(app_payload) = payload.as_application_payload() { - app_payload.payload_type.clone() - } else { - String::new() - } +/// Builds the Authenticated Data (AAD) for header integrity checks +pub(crate) fn build_aad(msg: &Message) -> Vec { + let slim_header = msg.get_slim_header(); + let session_header = msg.get_session_header(); + + let payload_type = if let Some(payload) = msg.get_payload() { + if let Ok(app_payload) = payload.as_application_payload() { + app_payload.payload_type.clone() } else { String::new() - }; - - let aad = HeaderIntegrityAad { - version: 1, - source: Some(slim_header.get_source().clone()), - destination: Some(slim_header.get_dst().clone()), - identity: slim_header.get_identity().to_string(), - session_type: session_header.session_type() as i32, - session_message_type: session_header.session_message_type() as i32, - session_id: session_header.get_session_id(), - message_id: session_header.get_message_id(), - payload_type, - }; - - aad.encode_to_vec() - } + } + } else { + String::new() + }; + + let aad = HeaderIntegrityAad { + version: 1, + source: Some(slim_header.get_source().clone()), + destination: Some(slim_header.get_dst().clone()), + identity: slim_header.get_identity().to_string(), + session_type: session_header.session_type() as i32, + session_message_type: session_header.session_message_type() as i32, + session_id: session_header.get_session_id(), + message_id: session_header.get_message_id(), + payload_type, + sequence_number: slim_header.sequence_number, + }; + + aad.encode_to_vec() } #[derive(Debug)] diff --git a/data-plane/core/session/src/session_builder.rs b/data-plane/core/session/src/session_builder.rs index 733b4fda2..326f2e471 100644 --- a/data-plane/core/session/src/session_builder.rs +++ b/data-plane/core/session/src/session_builder.rs @@ -446,6 +446,7 @@ where graceful_shutdown_timeout: self.graceful_shutdown_timeout, subscription_manager, service_id: self.service_id.unwrap_or_default(), + seen_control_seqs: crate::session_settings::new_seen_control_seqs(), }; let wrapper = wrapper_constructor(inner, settings.clone()); diff --git a/data-plane/core/session/src/session_controller.rs b/data-plane/core/session/src/session_controller.rs index 7adecea51..8b91ef871 100644 --- a/data-plane/core/session/src/session_controller.rs +++ b/data-plane/core/session/src/session_controller.rs @@ -32,7 +32,11 @@ use crate::{ traits::{MessageHandler, ProcessingState}, }; -pub(crate) async fn verify_identity(msg: &Message, verifier: &V) -> Result<(), SessionError> +pub(crate) async fn verify_identity( + msg: &Message, + verifier: &V, + e2e_integrity_required: bool, +) -> Result<(), SessionError> where V: Verifier + Send + Sync, { @@ -40,6 +44,51 @@ where if verifier.try_verify(&identity).is_err() { verifier.verify(&identity).await?; } + + if e2e_integrity_required && msg.get_session_message_type().is_command_message() { + let slim_header = msg.get_slim_header(); + let Some(sig) = &slim_header.e2e_header_sig else { + return Err(SessionError::Auth(slim_auth::errors::AuthError::TokenInvalid)); + }; + + #[derive(serde::Deserialize)] + struct CustomClaims { + pubkey: Option, + } + + #[derive(serde::Deserialize)] + struct IdentityClaims { + pubkey: Option, + custom_claims: Option, + } + + let claims_res = verifier.get_claims(&identity).await; + if let Err(ref e) = claims_res { + tracing::error!("verify_identity: get_claims failed with: {:?}", e); + } + let claims: IdentityClaims = claims_res?; + let pubkey = claims.pubkey + .or_else(|| claims.custom_claims.and_then(|c| c.pubkey)) + .ok_or_else(|| { + tracing::error!("verify_identity: pubkey not found in claims. claims_json: {:?}", identity); + SessionError::Auth(slim_auth::errors::AuthError::TokenInvalid) + })?; + + use base64::Engine as _; + let pubkey_bytes_res = base64::engine::general_purpose::STANDARD.decode(&pubkey); + if let Err(ref e) = pubkey_bytes_res { + tracing::error!("verify_identity: base64 decode of pubkey failed with: {:?}", e); + } + let pubkey_bytes = pubkey_bytes_res + .map_err(|_| SessionError::Auth(slim_auth::errors::AuthError::TokenMalformed))?; + + let aad = crate::mls_state::build_aad(msg); + let verify_res = slim_auth::utils::verify_header_aad(&aad, sig, &pubkey_bytes); + if let Err(ref e) = verify_res { + tracing::error!("verify_identity: verify_header_aad failed: {:?}", e); + } + verify_res?; + } Ok(()) } @@ -182,6 +231,52 @@ impl SessionController { } } + async fn verify_and_check_replay( + msg: &Message, + settings: &SessionSettings, + ) -> Result<(), SessionError> + where + P: slim_auth::traits::TokenProvider + Send + Sync + Clone + 'static, + V: slim_auth::traits::Verifier + Send + Sync + Clone + 'static, + M: crate::subscription_manager::SubscriptionOps, + { + let msg_type = msg.get_session_message_type(); + let is_post_session_control = matches!( + msg_type, + ProtoSessionMessageType::LeaveRequest + | ProtoSessionMessageType::LeaveReply + | ProtoSessionMessageType::GroupAdd + | ProtoSessionMessageType::GroupRemove + | ProtoSessionMessageType::GroupClose + | ProtoSessionMessageType::GroupProposal + | ProtoSessionMessageType::GroupAck + | ProtoSessionMessageType::GroupNack + | ProtoSessionMessageType::Ping + ); + // Require E2E verification only when the sender included a signature (matches pre-session path). + let e2e_required = is_post_session_control && msg.get_slim_header().e2e_header_sig.is_some(); + + // 1. Verify E2E header signature and token + crate::session_controller::verify_identity(msg, &settings.identity_verifier, e2e_required).await?; + + // 2. Perform sequence number check for command messages + if e2e_required && msg.get_session_message_type().is_command_message() { + let sender = msg.get_source(); + let seq = msg.get_slim_header().sequence_number; + if let Some(s) = seq { + let mut seen_control_seqs = settings.seen_control_seqs.lock(); + let seen = seen_control_seqs.entry(sender.clone()).or_default(); + if !seen.insert(s) { + // Duplicate delivery (network retransmission or fanout). Handlers are idempotent. + return Ok(()); + } + } else { + return Err(SessionError::Auth(slim_auth::errors::AuthError::TokenInvalid)); + } + } + Ok(()) + } + async fn processing_loop( mut inner: impl MessageHandler + 'static, mut rx: sync::mpsc::Receiver, @@ -194,7 +289,6 @@ impl SessionController { { // Start with an infinite timeout (will be updated on graceful shutdown) let mut shutdown_deadline = std::pin::pin!(tokio::time::sleep(Duration::MAX)); - // Init the inner components if let Err(e) = inner.init().await { tracing::error!(error = %e.chain(), "error during initialization of session"); @@ -210,10 +304,11 @@ impl SessionController { // Finish any ongoing processing before starting drain debug!("consuming pending messages before entering draining state"); while let Ok(msg) = rx.try_recv() { - if let SessionMessage::OnMessage { message, direction: MessageDirection::North, .. } = &msg - && let Err(e) = crate::session_controller::verify_identity(message, &settings.identity_verifier).await { - debug!(error = %e.chain(), "dropping inbound message during drain: identity verification failed"); - continue; + if let SessionMessage::OnMessage { message, direction: MessageDirection::North, .. } = &msg { + if let Err(e) = Self::verify_and_check_replay(message, &settings).await { + debug!(error = %e.chain(), "dropping inbound message during drain: verification or replay check failed"); + continue; + } } match inner.on_message(msg).await { Ok(output) => Self::dispatch_output(output, &settings).await, @@ -253,15 +348,16 @@ impl SessionController { continue; } - if let SessionMessage::OnMessage { message, direction: MessageDirection::North, .. } = &session_message - && let Err(e) = crate::session_controller::verify_identity(message, &settings.identity_verifier).await { - debug!( - error = %e.chain(), - msg_type = %message.get_session_message_type().as_str_name(), - msg_id = %message.get_id(), - "dropping inbound message: identity verification failed", - ); - continue; + if let SessionMessage::OnMessage { message, direction: MessageDirection::North, .. } = &session_message { + if let Err(e) = Self::verify_and_check_replay(message, &settings).await { + debug!( + error = %e.chain(), + msg_type = %message.get_session_message_type().as_str_name(), + msg_id = %message.get_id(), + "dropping inbound message: verification or replay check failed", + ); + continue; + } } let draining = inner.processing_state() == ProcessingState::Draining; @@ -286,7 +382,7 @@ impl SessionController { } } Err(e) => { - debug!( + tracing::error!( error=%e, "Error processing message{}", if draining { " during graceful shutdown" } else { "" } @@ -624,6 +720,9 @@ pub(crate) struct SessionControllerCommon< /// Maps (kind, name, conn) → subscription_id for route/subscription tracking. subscription_ids: HashMap<(SubscriptionKind, ProtoName, u64), u64>, + + /// Next sequence number to use for outbound control messages + pub(crate) next_control_seq: u64, } /// Distinguishes route entries from subscription entries in the subscription_ids map. @@ -659,6 +758,7 @@ where sender: controller_sender, processing_state: ProcessingState::Active, subscription_ids: HashMap::new(), + next_control_seq: 1, } } @@ -1881,6 +1981,7 @@ mod tests { graceful_shutdown_timeout: Some(Duration::from_secs(10)), subscription_manager, service_id: String::new(), + seen_control_seqs: crate::session_settings::new_seen_control_seqs(), }; let needs_drain = Arc::new(AtomicBool::new(true)); @@ -2057,6 +2158,7 @@ mod tests { graceful_shutdown_timeout, subscription_manager, service_id: String::new(), + seen_control_seqs: crate::session_settings::new_seen_control_seqs(), } } diff --git a/data-plane/core/session/src/session_layer.rs b/data-plane/core/session/src/session_layer.rs index c7879b9be..750174625 100644 --- a/data-plane/core/session/src/session_layer.rs +++ b/data-plane/core/session/src/session_layer.rs @@ -547,10 +547,11 @@ where Err(_) => return, }; + let e2e_required = message.get_slim_header().e2e_header_sig.is_some(); if let Err(e) = - crate::session_controller::verify_identity(&message, &layer.identity_verifier).await + crate::session_controller::verify_identity(&message, &layer.identity_verifier, e2e_required).await { - debug!( + error!( error = %e.chain(), msg_type = %session_message_type.as_str_name(), "dropping pre-session message: identity verification failed", diff --git a/data-plane/core/session/src/session_moderator.rs b/data-plane/core/session/src/session_moderator.rs index b6a2dbf52..c6161488a 100644 --- a/data-plane/core/session/src/session_moderator.rs +++ b/data-plane/core/session/src/session_moderator.rs @@ -22,7 +22,7 @@ use slim_mls::mls::Mls; use tracing::debug; use crate::{ - common::{MessageDirection, SessionMessage, SessionOutput}, + common::{MessageDirection, OutboundMessage, SessionMessage, SessionOutput}, errors::SessionError, mls_state::{MlsModeratorState, MlsState}, moderator_task::{ @@ -348,6 +348,21 @@ where output, &self.common.settings.identity_provider, )?; + if self.mls_state.is_some() { + for msg in &mut output.messages { + if let OutboundMessage::ToSlim(m) = msg { + if m.get_session_message_type().is_command_message() { + m.get_slim_header_mut().sequence_number = Some(self.common.next_control_seq); + self.common.next_control_seq += 1; + + let aad = crate::mls_state::build_aad(m); + let private_key = self.common.settings.identity_provider.get_signature_secret_key()?; + let signature = slim_auth::utils::sign_header_aad(&aad, &private_key)?; + m.get_slim_header_mut().e2e_header_sig = Some(signature); + } + } + } + } if let Some(mls_state) = &mut self.mls_state { mls_state.common.encrypt_output(output)?; } @@ -470,6 +485,9 @@ where participant_no_id.reset_id(); self.group_list.remove(&participant_no_id); + // Re-invited participants restart their control-seq counter; drop stale replay state. + self.common.settings.clear_seen_control_seqs(participant); + // Remove endpoint from local session self.remove_endpoint(participant); @@ -1336,6 +1354,7 @@ mod tests { graceful_shutdown_timeout: None, subscription_manager, service_id: String::new(), + seen_control_seqs: crate::session_settings::new_seen_control_seqs(), }; let inner = MockInnerHandler::new(); @@ -1700,6 +1719,7 @@ mod tests { graceful_shutdown_timeout: None, subscription_manager, service_id: String::new(), + seen_control_seqs: crate::session_settings::new_seen_control_seqs(), }; let inner = MockInnerHandler::new(); @@ -1781,6 +1801,7 @@ mod tests { graceful_shutdown_timeout: None, subscription_manager, service_id: String::new(), + seen_control_seqs: crate::session_settings::new_seen_control_seqs(), }; let inner = MockInnerHandler::new(); @@ -1931,6 +1952,7 @@ mod tests { graceful_shutdown_timeout: None, subscription_manager, service_id: String::new(), + seen_control_seqs: crate::session_settings::new_seen_control_seqs(), }; let inner = MockInnerHandler::new(); diff --git a/data-plane/core/session/src/session_participant.rs b/data-plane/core/session/src/session_participant.rs index 03feabaa5..55296fefd 100644 --- a/data-plane/core/session/src/session_participant.rs +++ b/data-plane/core/session/src/session_participant.rs @@ -16,7 +16,7 @@ use slim_mls::mls::Mls; use tracing::debug; use crate::{ - common::{MessageDirection, SessionMessage, SessionOutput}, + common::{MessageDirection, OutboundMessage, SessionMessage, SessionOutput}, errors::SessionError, mls_state::MlsState, session_controller::SessionControllerCommon, @@ -301,6 +301,25 @@ where output, &self.common.settings.identity_provider, )?; + if self.mls_state.is_some() { + for msg in &mut output.messages { + if let OutboundMessage::ToSlim(m) = msg + && m.get_session_message_type().is_command_message() + { + m.get_slim_header_mut().sequence_number = Some(self.common.next_control_seq); + self.common.next_control_seq += 1; + + let aad = crate::mls_state::build_aad(m); + let private_key = self + .common + .settings + .identity_provider + .get_signature_secret_key()?; + let signature = slim_auth::utils::sign_header_aad(&aad, &private_key)?; + m.get_slim_header_mut().e2e_header_sig = Some(signature); + } + } + } if let Some(mls_state) = &mut self.mls_state { mls_state.encrypt_output(output)?; } @@ -802,6 +821,7 @@ mod tests { graceful_shutdown_timeout: None, subscription_manager, service_id: String::new(), + seen_control_seqs: crate::session_settings::new_seen_control_seqs(), }; let inner = MockInnerHandler::new(); diff --git a/data-plane/core/session/src/session_settings.rs b/data-plane/core/session/src/session_settings.rs index ef590f3c1..56f83b850 100644 --- a/data-plane/core/session/src/session_settings.rs +++ b/data-plane/core/session/src/session_settings.rs @@ -1,6 +1,12 @@ // Copyright AGNTCY Contributors (https://github.com/agntcy) // SPDX-License-Identifier: Apache-2.0 +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use parking_lot::Mutex; use slim_auth::traits::{TokenProvider, Verifier}; use slim_datapath::api::ProtoName; @@ -75,4 +81,26 @@ where /// Service ID for tracing — identifies which service instance owns this session pub(crate) service_id: String, + + /// Seen control-message sequence numbers per remote sender (E2E replay protection). + pub(crate) seen_control_seqs: Arc>>>, +} + +pub(crate) fn new_seen_control_seqs() -> Arc>>> { + Arc::new(Mutex::new(HashMap::new())) +} + +impl SessionSettings +where + P: TokenProvider + Send + Sync + Clone + 'static, + V: Verifier + Send + Sync + Clone + 'static, + M: SubscriptionOps, +{ + /// Forget replay state for a participant that left or was removed so re-invites can restart seq. + pub(crate) fn clear_seen_control_seqs(&self, participant: &ProtoName) { + let components = participant.str_components(); + self.seen_control_seqs + .lock() + .retain(|k, _| k.str_components() != components); + } } diff --git a/proto/data-plane/v1/data_plane.proto b/proto/data-plane/v1/data_plane.proto index 9e7d88fb1..3e7372fe0 100644 --- a/proto/data-plane/v1/data_plane.proto +++ b/proto/data-plane/v1/data_plane.proto @@ -65,6 +65,8 @@ message SLIMHeader { optional uint64 incoming_conn = 8; optional bool error = 9; optional bytes header_mac = 10; + optional uint64 sequence_number = 11; + optional bytes e2e_header_sig = 12; } message SessionHeader { @@ -235,6 +237,7 @@ message HeaderIntegrityAad { uint32 session_id = 7; uint32 message_id = 8; string payload_type = 9; + optional uint64 sequence_number = 10; } // Group Add Payload From d5e94d431a4a16833f25e715dce09c07a7b29c98 Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Thu, 11 Jun 2026 16:07:23 +0200 Subject: [PATCH 02/12] feat: add header integrity validation and replay protection to control messages Signed-off-by: Mark Marton --- data-plane/core/auth/src/utils.rs | 16 +++++++++++++ data-plane/core/session/Cargo.toml | 1 + .../core/session/src/session_controller.rs | 10 ++++---- .../core/session/src/session_moderator.rs | 24 +++++++++++-------- 4 files changed, 35 insertions(+), 16 deletions(-) diff --git a/data-plane/core/auth/src/utils.rs b/data-plane/core/auth/src/utils.rs index bb23e75cf..79954709d 100644 --- a/data-plane/core/auth/src/utils.rs +++ b/data-plane/core/auth/src/utils.rs @@ -32,12 +32,24 @@ pub fn generate_mls_signature_keys() -> Result<(Vec, Vec), crate::errors } /// Sign the header AAD bytes using an Ed25519 private key. +<<<<<<< HEAD pub fn sign_header_aad(aad_bytes: &[u8], private_key_bytes: &[u8]) -> Result, crate::errors::AuthError> { use aws_lc_rs::signature::Ed25519KeyPair; let key_pair = Ed25519KeyPair::from_seed_and_public_key( &private_key_bytes[..32], &private_key_bytes[32..], ) +======= +pub fn sign_header_aad( + aad_bytes: &[u8], + private_key_bytes: &[u8], +) -> Result, crate::errors::AuthError> { + use aws_lc_rs::signature::Ed25519KeyPair; + let key_pair = Ed25519KeyPair::from_seed_and_public_key( + &private_key_bytes[..32], + &private_key_bytes[32..], + ) +>>>>>>> cf013eb9 (feat: add header integrity validation and replay protection to control messages) .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; let signature = key_pair.sign(aad_bytes); Ok(signature.as_ref().to_vec()) @@ -49,7 +61,11 @@ pub fn verify_header_aad( signature_bytes: &[u8], public_key_bytes: &[u8], ) -> Result<(), crate::errors::AuthError> { +<<<<<<< HEAD use aws_lc_rs::signature::{UnparsedPublicKey, ED25519}; +======= + use aws_lc_rs::signature::{ED25519, UnparsedPublicKey}; +>>>>>>> cf013eb9 (feat: add header integrity validation and replay protection to control messages) let public_key = UnparsedPublicKey::new(&ED25519, public_key_bytes); public_key .verify(aad_bytes, signature_bytes) diff --git a/data-plane/core/session/Cargo.toml b/data-plane/core/session/Cargo.toml index f6cecec2c..b34de5d25 100644 --- a/data-plane/core/session/Cargo.toml +++ b/data-plane/core/session/Cargo.toml @@ -15,6 +15,7 @@ agntcy-slim-mls = { workspace = true } agntcy-slim-version = { workspace = true } base64 = { workspace = true } async-trait = { workspace = true } +base64 = { workspace = true } display-error-chain = { workspace = true } futures = { workspace = true } futures-timer = { workspace = true } diff --git a/data-plane/core/session/src/session_controller.rs b/data-plane/core/session/src/session_controller.rs index 8b91ef871..335109669 100644 --- a/data-plane/core/session/src/session_controller.rs +++ b/data-plane/core/session/src/session_controller.rs @@ -304,11 +304,10 @@ impl SessionController { // Finish any ongoing processing before starting drain debug!("consuming pending messages before entering draining state"); while let Ok(msg) = rx.try_recv() { - if let SessionMessage::OnMessage { message, direction: MessageDirection::North, .. } = &msg { - if let Err(e) = Self::verify_and_check_replay(message, &settings).await { + if let SessionMessage::OnMessage { message, direction: MessageDirection::North, .. } = &msg + && let Err(e) = Self::verify_and_check_replay(message, &settings).await { debug!(error = %e.chain(), "dropping inbound message during drain: verification or replay check failed"); continue; - } } match inner.on_message(msg).await { Ok(output) => Self::dispatch_output(output, &settings).await, @@ -348,8 +347,8 @@ impl SessionController { continue; } - if let SessionMessage::OnMessage { message, direction: MessageDirection::North, .. } = &session_message { - if let Err(e) = Self::verify_and_check_replay(message, &settings).await { + if let SessionMessage::OnMessage { message, direction: MessageDirection::North, .. } = &session_message + && let Err(e) = Self::verify_and_check_replay(message, &settings).await { debug!( error = %e.chain(), msg_type = %message.get_session_message_type().as_str_name(), @@ -357,7 +356,6 @@ impl SessionController { "dropping inbound message: verification or replay check failed", ); continue; - } } let draining = inner.processing_state() == ProcessingState::Draining; diff --git a/data-plane/core/session/src/session_moderator.rs b/data-plane/core/session/src/session_moderator.rs index c6161488a..831a4efad 100644 --- a/data-plane/core/session/src/session_moderator.rs +++ b/data-plane/core/session/src/session_moderator.rs @@ -350,16 +350,20 @@ where )?; if self.mls_state.is_some() { for msg in &mut output.messages { - if let OutboundMessage::ToSlim(m) = msg { - if m.get_session_message_type().is_command_message() { - m.get_slim_header_mut().sequence_number = Some(self.common.next_control_seq); - self.common.next_control_seq += 1; - - let aad = crate::mls_state::build_aad(m); - let private_key = self.common.settings.identity_provider.get_signature_secret_key()?; - let signature = slim_auth::utils::sign_header_aad(&aad, &private_key)?; - m.get_slim_header_mut().e2e_header_sig = Some(signature); - } + if let OutboundMessage::ToSlim(m) = msg + && m.get_session_message_type().is_command_message() + { + m.get_slim_header_mut().sequence_number = Some(self.common.next_control_seq); + self.common.next_control_seq += 1; + + let aad = crate::mls_state::build_aad(m); + let private_key = self + .common + .settings + .identity_provider + .get_signature_secret_key()?; + let signature = slim_auth::utils::sign_header_aad(&aad, &private_key)?; + m.get_slim_header_mut().e2e_header_sig = Some(signature); } } } From 6291e32a64b14d2a1e5303b521c099e2a5fad139 Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Thu, 11 Jun 2026 16:07:23 +0200 Subject: [PATCH 03/12] feat: add header integrity validation and replay protection to control messages Signed-off-by: Mark Marton --- data-plane/core/auth/src/utils.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/data-plane/core/auth/src/utils.rs b/data-plane/core/auth/src/utils.rs index 79954709d..8a61d18c1 100644 --- a/data-plane/core/auth/src/utils.rs +++ b/data-plane/core/auth/src/utils.rs @@ -32,24 +32,12 @@ pub fn generate_mls_signature_keys() -> Result<(Vec, Vec), crate::errors } /// Sign the header AAD bytes using an Ed25519 private key. -<<<<<<< HEAD pub fn sign_header_aad(aad_bytes: &[u8], private_key_bytes: &[u8]) -> Result, crate::errors::AuthError> { use aws_lc_rs::signature::Ed25519KeyPair; let key_pair = Ed25519KeyPair::from_seed_and_public_key( &private_key_bytes[..32], &private_key_bytes[32..], ) -======= -pub fn sign_header_aad( - aad_bytes: &[u8], - private_key_bytes: &[u8], -) -> Result, crate::errors::AuthError> { - use aws_lc_rs::signature::Ed25519KeyPair; - let key_pair = Ed25519KeyPair::from_seed_and_public_key( - &private_key_bytes[..32], - &private_key_bytes[32..], - ) ->>>>>>> cf013eb9 (feat: add header integrity validation and replay protection to control messages) .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; let signature = key_pair.sign(aad_bytes); Ok(signature.as_ref().to_vec()) @@ -61,11 +49,7 @@ pub fn verify_header_aad( signature_bytes: &[u8], public_key_bytes: &[u8], ) -> Result<(), crate::errors::AuthError> { -<<<<<<< HEAD - use aws_lc_rs::signature::{UnparsedPublicKey, ED25519}; -======= use aws_lc_rs::signature::{ED25519, UnparsedPublicKey}; ->>>>>>> cf013eb9 (feat: add header integrity validation and replay protection to control messages) let public_key = UnparsedPublicKey::new(&ED25519, public_key_bytes); public_key .verify(aad_bytes, signature_bytes) From 28a30bef0f4f2a9ea3f3045f13146738ae6e96f4 Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Fri, 12 Jun 2026 17:16:29 +0200 Subject: [PATCH 04/12] fix: fix linting Signed-off-by: Mark Marton --- data-plane/core/auth/src/utils.rs | 11 ++++--- .../core/datapath/benches/name_benchmark.rs | 3 -- .../src/api/gen/dataplane.proto.v1.rs | 4 +-- data-plane/core/datapath/src/header_mac.rs | 3 -- .../core/datapath/src/messages/utils.rs | 3 -- data-plane/core/session/Cargo.toml | 1 - .../core/session/src/session_controller.rs | 29 ++++++++++++++----- data-plane/core/session/src/session_layer.rs | 8 +++-- proto/data-plane/v1/data_plane.proto | 12 ++------ 9 files changed, 38 insertions(+), 36 deletions(-) diff --git a/data-plane/core/auth/src/utils.rs b/data-plane/core/auth/src/utils.rs index 8a61d18c1..3b44fe9dd 100644 --- a/data-plane/core/auth/src/utils.rs +++ b/data-plane/core/auth/src/utils.rs @@ -32,12 +32,15 @@ pub fn generate_mls_signature_keys() -> Result<(Vec, Vec), crate::errors } /// Sign the header AAD bytes using an Ed25519 private key. -pub fn sign_header_aad(aad_bytes: &[u8], private_key_bytes: &[u8]) -> Result, crate::errors::AuthError> { +pub fn sign_header_aad( + aad_bytes: &[u8], + private_key_bytes: &[u8], +) -> Result, crate::errors::AuthError> { use aws_lc_rs::signature::Ed25519KeyPair; let key_pair = Ed25519KeyPair::from_seed_and_public_key( - &private_key_bytes[..32], - &private_key_bytes[32..], - ) + &private_key_bytes[..32], + &private_key_bytes[32..], + ) .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; let signature = key_pair.sign(aad_bytes); Ok(signature.as_ref().to_vec()) diff --git a/data-plane/core/datapath/benches/name_benchmark.rs b/data-plane/core/datapath/benches/name_benchmark.rs index fafbad790..29d16121a 100644 --- a/data-plane/core/datapath/benches/name_benchmark.rs +++ b/data-plane/core/datapath/benches/name_benchmark.rs @@ -39,12 +39,9 @@ fn make_slim_header() -> SlimHeader { incoming_conn: None, error: None, header_mac: None, -<<<<<<< control-message-header-integrity e2e_header_sig: None, sequence_number: None, -======= ttl: DEFAULT_TTL, ->>>>>>> main } } diff --git a/data-plane/core/datapath/src/api/gen/dataplane.proto.v1.rs b/data-plane/core/datapath/src/api/gen/dataplane.proto.v1.rs index 56403e297..e385a43f6 100644 --- a/data-plane/core/datapath/src/api/gen/dataplane.proto.v1.rs +++ b/data-plane/core/datapath/src/api/gen/dataplane.proto.v1.rs @@ -96,9 +96,9 @@ pub struct SlimHeader { pub error: ::core::option::Option, #[prost(bytes = "vec", optional, tag = "11")] pub header_mac: ::core::option::Option<::prost::alloc::vec::Vec>, - #[prost(uint64, optional, tag = "11")] + #[prost(uint64, optional, tag = "12")] pub sequence_number: ::core::option::Option, - #[prost(bytes = "vec", optional, tag = "12")] + #[prost(bytes = "vec", optional, tag = "13")] pub e2e_header_sig: ::core::option::Option<::prost::alloc::vec::Vec>, } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/data-plane/core/datapath/src/header_mac.rs b/data-plane/core/datapath/src/header_mac.rs index 385bab2d3..1c9270662 100644 --- a/data-plane/core/datapath/src/header_mac.rs +++ b/data-plane/core/datapath/src/header_mac.rs @@ -321,12 +321,9 @@ mod tests { incoming_conn: Some(999), error: Some(false), header_mac: None, -<<<<<<< control-message-header-integrity sequence_number: None, e2e_header_sig: None, -======= ttl: DEFAULT_TTL, ->>>>>>> main } } diff --git a/data-plane/core/datapath/src/messages/utils.rs b/data-plane/core/datapath/src/messages/utils.rs index 5cbb4bca0..f9ae2bfd2 100644 --- a/data-plane/core/datapath/src/messages/utils.rs +++ b/data-plane/core/datapath/src/messages/utils.rs @@ -299,12 +299,9 @@ impl SlimHeader { incoming_conn: flags.incoming_conn, error: flags.error, header_mac: None, -<<<<<<< control-message-header-integrity sequence_number: None, e2e_header_sig: None, -======= ttl: flags.ttl, ->>>>>>> main } } diff --git a/data-plane/core/session/Cargo.toml b/data-plane/core/session/Cargo.toml index b34de5d25..b28d923ed 100644 --- a/data-plane/core/session/Cargo.toml +++ b/data-plane/core/session/Cargo.toml @@ -13,7 +13,6 @@ agntcy-slim-auth = { workspace = true } agntcy-slim-datapath = { workspace = true } agntcy-slim-mls = { workspace = true } agntcy-slim-version = { workspace = true } -base64 = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } display-error-chain = { workspace = true } diff --git a/data-plane/core/session/src/session_controller.rs b/data-plane/core/session/src/session_controller.rs index 335109669..499704dce 100644 --- a/data-plane/core/session/src/session_controller.rs +++ b/data-plane/core/session/src/session_controller.rs @@ -48,7 +48,9 @@ where if e2e_integrity_required && msg.get_session_message_type().is_command_message() { let slim_header = msg.get_slim_header(); let Some(sig) = &slim_header.e2e_header_sig else { - return Err(SessionError::Auth(slim_auth::errors::AuthError::TokenInvalid)); + return Err(SessionError::Auth( + slim_auth::errors::AuthError::TokenInvalid, + )); }; #[derive(serde::Deserialize)] @@ -67,17 +69,24 @@ where tracing::error!("verify_identity: get_claims failed with: {:?}", e); } let claims: IdentityClaims = claims_res?; - let pubkey = claims.pubkey + let pubkey = claims + .pubkey .or_else(|| claims.custom_claims.and_then(|c| c.pubkey)) .ok_or_else(|| { - tracing::error!("verify_identity: pubkey not found in claims. claims_json: {:?}", identity); + tracing::error!( + "verify_identity: pubkey not found in claims. claims_json: {:?}", + identity + ); SessionError::Auth(slim_auth::errors::AuthError::TokenInvalid) })?; - + use base64::Engine as _; let pubkey_bytes_res = base64::engine::general_purpose::STANDARD.decode(&pubkey); if let Err(ref e) = pubkey_bytes_res { - tracing::error!("verify_identity: base64 decode of pubkey failed with: {:?}", e); + tracing::error!( + "verify_identity: base64 decode of pubkey failed with: {:?}", + e + ); } let pubkey_bytes = pubkey_bytes_res .map_err(|_| SessionError::Auth(slim_auth::errors::AuthError::TokenMalformed))?; @@ -254,10 +263,12 @@ impl SessionController { | ProtoSessionMessageType::Ping ); // Require E2E verification only when the sender included a signature (matches pre-session path). - let e2e_required = is_post_session_control && msg.get_slim_header().e2e_header_sig.is_some(); + let e2e_required = + is_post_session_control && msg.get_slim_header().e2e_header_sig.is_some(); // 1. Verify E2E header signature and token - crate::session_controller::verify_identity(msg, &settings.identity_verifier, e2e_required).await?; + crate::session_controller::verify_identity(msg, &settings.identity_verifier, e2e_required) + .await?; // 2. Perform sequence number check for command messages if e2e_required && msg.get_session_message_type().is_command_message() { @@ -271,7 +282,9 @@ impl SessionController { return Ok(()); } } else { - return Err(SessionError::Auth(slim_auth::errors::AuthError::TokenInvalid)); + return Err(SessionError::Auth( + slim_auth::errors::AuthError::TokenInvalid, + )); } } Ok(()) diff --git a/data-plane/core/session/src/session_layer.rs b/data-plane/core/session/src/session_layer.rs index 750174625..4ab81105a 100644 --- a/data-plane/core/session/src/session_layer.rs +++ b/data-plane/core/session/src/session_layer.rs @@ -548,8 +548,12 @@ where }; let e2e_required = message.get_slim_header().e2e_header_sig.is_some(); - if let Err(e) = - crate::session_controller::verify_identity(&message, &layer.identity_verifier, e2e_required).await + if let Err(e) = crate::session_controller::verify_identity( + &message, + &layer.identity_verifier, + e2e_required, + ) + .await { error!( error = %e.chain(), diff --git a/proto/data-plane/v1/data_plane.proto b/proto/data-plane/v1/data_plane.proto index 0b2178b39..796e90e2e 100644 --- a/proto/data-plane/v1/data_plane.proto +++ b/proto/data-plane/v1/data_plane.proto @@ -60,22 +60,14 @@ message SLIMHeader { string identity = 3; uint32 fanout = 4; string version = 5; -<<<<<<< control-message-header-integrity - optional uint64 recv_from = 6; - optional uint64 forward_to = 7; - optional uint64 incoming_conn = 8; - optional bool error = 9; - optional bytes header_mac = 10; - optional uint64 sequence_number = 11; - optional bytes e2e_header_sig = 12; -======= uint32 ttl = 6; optional uint64 recv_from = 7; optional uint64 forward_to = 8; optional uint64 incoming_conn = 9; optional bool error = 10; optional bytes header_mac = 11; ->>>>>>> main + optional uint64 sequence_number = 12; + optional bytes e2e_header_sig = 13; } message SessionHeader { From 0ca9333e70573d0ef4395328f25daa09e7bedc0b Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Mon, 15 Jun 2026 13:24:02 +0200 Subject: [PATCH 05/12] fix: fix for false positive CodeQL warning Signed-off-by: Mark Marton --- data-plane/core/session/src/session_layer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data-plane/core/session/src/session_layer.rs b/data-plane/core/session/src/session_layer.rs index 4ab81105a..9dff23e44 100644 --- a/data-plane/core/session/src/session_layer.rs +++ b/data-plane/core/session/src/session_layer.rs @@ -555,8 +555,9 @@ where ) .await { + let err = e.chain(); error!( - error = %e.chain(), + error = %err, msg_type = %session_message_type.as_str_name(), "dropping pre-session message: identity verification failed", ); From 795a826bd1e29eed23c1bdbb6e5d038f87e5c8aa Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Mon, 15 Jun 2026 15:13:54 +0200 Subject: [PATCH 06/12] fix: remove redundant AAD generator function Signed-off-by: Mark Marton --- data-plane/core/session/src/mls_state.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/data-plane/core/session/src/mls_state.rs b/data-plane/core/session/src/mls_state.rs index 7a70c0406..0027f3af5 100644 --- a/data-plane/core/session/src/mls_state.rs +++ b/data-plane/core/session/src/mls_state.rs @@ -308,7 +308,7 @@ where let payload = msg.get_payload().unwrap().as_application_payload()?; debug!("Encrypting message for group member"); - let aad = self.build_aad(msg); + let aad = build_aad(msg); let encrypted_payload = self.mls.encrypt_message(&payload.blob, aad)?; msg.set_payload( @@ -345,7 +345,7 @@ where }; if should_validate { - let expected_aad = self.build_aad(msg); + let expected_aad = build_aad(msg); if expected_aad != auth_data { let expected_decoded = HeaderIntegrityAad::decode(&expected_aad[..]); let got_decoded = HeaderIntegrityAad::decode(&auth_data[..]); @@ -363,11 +363,6 @@ where ); Ok(()) } - - /// Builds the Authenticated Data (AAD) for header integrity checks - fn build_aad(&self, msg: &Message) -> Vec { - build_aad(msg) - } } /// Builds the Authenticated Data (AAD) for header integrity checks From 9c2ec9eba7a1e3139641c759cac3c488208c2d62 Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Tue, 16 Jun 2026 13:46:27 +0200 Subject: [PATCH 07/12] fix: add header signing for both cypher algo implementations Signed-off-by: Mark Marton --- data-plane/Cargo.lock | 178 ++++++++++++ data-plane/core/auth/Cargo.toml | 2 + data-plane/core/auth/src/errors.rs | 2 + data-plane/core/auth/src/lib.rs | 2 +- data-plane/core/auth/src/utils.rs | 254 +++++++++++++++++- data-plane/core/mls/src/mls.rs | 7 + data-plane/core/session/src/mls_state.rs | 1 + .../core/session/src/session_moderator.rs | 26 +- .../core/session/src/session_participant.rs | 26 +- 9 files changed, 466 insertions(+), 32 deletions(-) diff --git a/data-plane/Cargo.lock b/data-plane/Cargo.lock index e29978a5b..d7c0854ed 100644 --- a/data-plane/Cargo.lock +++ b/data-plane/Cargo.lock @@ -39,6 +39,7 @@ dependencies = [ "cfg-if", "criterion", "display-error-chain", + "ed25519-dalek", "futures", "getrandom 0.3.4", "headers", @@ -48,6 +49,7 @@ dependencies = [ "jsonwebtoken", "notify", "oauth2", + "p256", "parking_lot", "pin-project", "prost-types", @@ -716,6 +718,12 @@ dependencies = [ "tower-service", ] +[[package]] +name = "base16ct" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" + [[package]] name = "base64" version = "0.21.7" @@ -1114,6 +1122,18 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "crypto-bigint" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" +dependencies = [ + "generic-array", + "rand_core 0.6.4", + "subtle", + "zeroize", +] + [[package]] name = "crypto-common" version = "0.1.7" @@ -1124,6 +1144,32 @@ dependencies = [ "typenum", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "darling" version = "0.20.11" @@ -1251,6 +1297,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" dependencies = [ "const-oid 0.9.6", + "zeroize", ] [[package]] @@ -1366,6 +1413,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid 0.9.6", "crypto-common", "subtle", ] @@ -1439,12 +1487,67 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" +[[package]] +name = "ecdsa" +version = "0.16.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca" +dependencies = [ + "der 0.7.10", + "digest", + "elliptic-curve", + "rfc6979", + "signature", + "spki", +] + +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek", + "ed25519", + "rand_core 0.6.4", + "sha2", + "subtle", +] + [[package]] name = "either" version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" +[[package]] +name = "elliptic-curve" +version = "0.13.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47" +dependencies = [ + "base16ct", + "crypto-bigint", + "digest", + "ff", + "generic-array", + "group", + "pkcs8", + "rand_core 0.6.4", + "sec1", + "subtle", + "zeroize", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -1496,6 +1599,22 @@ dependencies = [ "utf-8", ] +[[package]] +name = "ff" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0b50bfb653653f9ca9095b427bed08ab8d75a137839d9ad64eb11810d5b6393" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -1652,6 +1771,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", + "zeroize", ] [[package]] @@ -1701,6 +1821,17 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "group" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" +dependencies = [ + "ff", + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "h2" version = "0.4.14" @@ -2817,6 +2948,18 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "p256" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9863ad85fa8f4460f9c48cb909d38a0d689dba1f6f6988a5e3e0d31071bcd4b" +dependencies = [ + "ecdsa", + "elliptic-curve", + "primeorder", + "sha2", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -2989,6 +3132,15 @@ dependencies = [ "syn", ] +[[package]] +name = "primeorder" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6" +dependencies = [ + "elliptic-curve", +] + [[package]] name = "proc-macro-crate" version = "3.5.0" @@ -3366,6 +3518,16 @@ dependencies = [ "web-sys", ] +[[package]] +name = "rfc6979" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2" +dependencies = [ + "hmac", + "subtle", +] + [[package]] name = "ring" version = "0.17.14" @@ -3587,6 +3749,20 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sec1" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc" +dependencies = [ + "base16ct", + "der 0.7.10", + "generic-array", + "pkcs8", + "subtle", + "zeroize", +] + [[package]] name = "security-framework" version = "3.7.0" @@ -3821,6 +3997,7 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ + "digest", "rand_core 0.6.4", ] @@ -3930,6 +4107,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" dependencies = [ + "base64ct", "der 0.7.10", ] diff --git a/data-plane/core/auth/Cargo.toml b/data-plane/core/auth/Cargo.toml index 3a1fdca29..dccae7e20 100644 --- a/data-plane/core/auth/Cargo.toml +++ b/data-plane/core/auth/Cargo.toml @@ -56,8 +56,10 @@ wiremock = { workspace = true } # native-only). `getrandom` with the wasm_js backend is required so that # `rand` works in the browser. [target.'cfg(target_arch = "wasm32")'.dependencies] +ed25519-dalek = { version = "2.1", default-features = false, features = ["rand_core"] } getrandom = { workspace = true } hmac = { workspace = true } +p256 = { version = "0.13", default-features = false, features = ["ecdsa", "pkcs8", "std"] } sha2 = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] diff --git a/data-plane/core/auth/src/errors.rs b/data-plane/core/auth/src/errors.rs index 0859fd57c..98a47601c 100644 --- a/data-plane/core/auth/src/errors.rs +++ b/data-plane/core/auth/src/errors.rs @@ -171,6 +171,8 @@ pub enum AuthError { // MLS #[error("MLS is not supported by this provider")] MlsNotSupported, + #[error("MLS signature key generation failed")] + MlsKeyGenerationFailed, #[error("public key not found in identity claims")] PublicKeyNotFound, #[error("subject not found in identity claims")] diff --git a/data-plane/core/auth/src/lib.rs b/data-plane/core/auth/src/lib.rs index eb280b3fd..1de01795a 100644 --- a/data-plane/core/auth/src/lib.rs +++ b/data-plane/core/auth/src/lib.rs @@ -8,6 +8,7 @@ pub(crate) mod mac; pub mod metadata; pub mod shared_secret; pub mod traits; +pub mod utils; // Native-only modules cfg_if::cfg_if! { @@ -21,5 +22,4 @@ pub mod oidc; pub mod resolver; #[cfg(not(target_family = "windows"))] pub mod spire; -pub mod utils; }} diff --git a/data-plane/core/auth/src/utils.rs b/data-plane/core/auth/src/utils.rs index 556d1162f..46eda92ca 100644 --- a/data-plane/core/auth/src/utils.rs +++ b/data-plane/core/auth/src/utils.rs @@ -5,34 +5,244 @@ use base64::Engine; -/// Sign the header AAD bytes using an Ed25519 private key. +/// Sign the header AAD bytes using the MLS signature key pair. +/// +/// Supports Ed25519 (Curve25519 MLS ciphersuite) and ECDSA P-256 (default MLS +/// ciphersuite), selected from the public key encoding length. pub fn sign_header_aad( aad_bytes: &[u8], private_key_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result, crate::errors::AuthError> { + match public_key_bytes.len() { + 32 => sign_header_aad_ed25519(aad_bytes, private_key_bytes, public_key_bytes), + 33 | 65 => sign_header_aad_p256(aad_bytes, private_key_bytes, public_key_bytes), + _ => Err(crate::errors::AuthError::MlsKeyGenerationFailed), + } +} + +/// Verify the header AAD signature using an MLS signature public key. +pub fn verify_header_aad( + aad_bytes: &[u8], + signature_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result<(), crate::errors::AuthError> { + match public_key_bytes.len() { + 32 => verify_header_aad_ed25519(aad_bytes, signature_bytes, public_key_bytes), + 33 | 65 => verify_header_aad_p256(aad_bytes, signature_bytes, public_key_bytes), + _ => Err(crate::errors::AuthError::TokenInvalid), + } +} + +#[cfg(not(target_arch = "wasm32"))] +fn sign_header_aad_ed25519( + aad_bytes: &[u8], + private_key_bytes: &[u8], + public_key_bytes: &[u8], ) -> Result, crate::errors::AuthError> { use aws_lc_rs::signature::Ed25519KeyPair; - let key_pair = Ed25519KeyPair::from_seed_and_public_key( - &private_key_bytes[..32], - &private_key_bytes[32..], - ) + + // MLS may store Ed25519 secrets either as a 32-byte seed with the public key + // kept separately, or as a 64-byte seed||public concatenation (see + // mls-rs-crypto-awslc x509 import paths). + let key_pair = if private_key_bytes.len() >= 64 { + Ed25519KeyPair::from_seed_and_public_key( + &private_key_bytes[..32], + &private_key_bytes[32..64], + ) + } else { + Ed25519KeyPair::from_seed_and_public_key(private_key_bytes, public_key_bytes) + } .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; let signature = key_pair.sign(aad_bytes); Ok(signature.as_ref().to_vec()) } -/// Verify the header AAD signature using an Ed25519 public key. -pub fn verify_header_aad( +#[cfg(target_arch = "wasm32")] +fn sign_header_aad_ed25519( + aad_bytes: &[u8], + private_key_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result, crate::errors::AuthError> { + use ed25519_dalek::Signer; + + let signing_key = ed25519_signing_key(private_key_bytes, public_key_bytes)?; + Ok(signing_key.sign(aad_bytes).to_bytes().to_vec()) +} + +#[cfg(not(target_arch = "wasm32"))] +fn sign_header_aad_p256( + aad_bytes: &[u8], + private_key_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result, crate::errors::AuthError> { + use aws_lc_rs::rand::SystemRandom; + use aws_lc_rs::signature::{EcdsaKeyPair, ECDSA_P256_SHA256_ASN1_SIGNING}; + + let key_pair = EcdsaKeyPair::from_private_key_and_public_key( + &ECDSA_P256_SHA256_ASN1_SIGNING, + private_key_bytes, + public_key_bytes, + ) + .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; + let rng = SystemRandom::new(); + let signature = key_pair + .sign(&rng, aad_bytes) + .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; + Ok(signature.as_ref().to_vec()) +} + +#[cfg(target_arch = "wasm32")] +fn sign_header_aad_p256( + aad_bytes: &[u8], + private_key_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result, crate::errors::AuthError> { + use p256::ecdsa::signature::Signer as _; + use p256::ecdsa::Signature; + + let signing_key = p256_signing_key(private_key_bytes, public_key_bytes)?; + let signature: Signature = signing_key.sign(aad_bytes); + Ok(signature.to_der().as_bytes().to_vec()) +} + +#[cfg(not(target_arch = "wasm32"))] +fn verify_header_aad_ed25519( aad_bytes: &[u8], signature_bytes: &[u8], public_key_bytes: &[u8], ) -> Result<(), crate::errors::AuthError> { use aws_lc_rs::signature::{ED25519, UnparsedPublicKey}; + let public_key = UnparsedPublicKey::new(&ED25519, public_key_bytes); public_key .verify(aad_bytes, signature_bytes) .map_err(|_| crate::errors::AuthError::TokenInvalid) } +#[cfg(target_arch = "wasm32")] +fn verify_header_aad_ed25519( + aad_bytes: &[u8], + signature_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result<(), crate::errors::AuthError> { + use ed25519_dalek::{Signature, Verifier, VerifyingKey}; + + let verifying_key = VerifyingKey::from_bytes( + public_key_bytes + .try_into() + .map_err(|_| crate::errors::AuthError::TokenInvalid)?, + ) + .map_err(|_| crate::errors::AuthError::TokenInvalid)?; + let signature = Signature::from_bytes( + signature_bytes + .try_into() + .map_err(|_| crate::errors::AuthError::TokenInvalid)?, + ); + verifying_key + .verify(aad_bytes, &signature) + .map_err(|_| crate::errors::AuthError::TokenInvalid) +} + +#[cfg(not(target_arch = "wasm32"))] +fn verify_header_aad_p256( + aad_bytes: &[u8], + signature_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result<(), crate::errors::AuthError> { + use aws_lc_rs::signature::{ECDSA_P256_SHA256_ASN1, UnparsedPublicKey}; + + let public_key = UnparsedPublicKey::new(&ECDSA_P256_SHA256_ASN1, public_key_bytes); + public_key + .verify(aad_bytes, signature_bytes) + .map_err(|_| crate::errors::AuthError::TokenInvalid) +} + +#[cfg(target_arch = "wasm32")] +fn verify_header_aad_p256( + aad_bytes: &[u8], + signature_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result<(), crate::errors::AuthError> { + use p256::ecdsa::signature::Verifier; + use p256::ecdsa::Signature; + + let verifying_key = p256_verifying_key(public_key_bytes)?; + let signature = + Signature::from_der(signature_bytes).map_err(|_| crate::errors::AuthError::TokenInvalid)?; + verifying_key + .verify(aad_bytes, &signature) + .map_err(|_| crate::errors::AuthError::TokenInvalid) +} + +#[cfg(target_arch = "wasm32")] +fn ed25519_signing_key( + private_key_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result { + use ed25519_dalek::SigningKey; + + if private_key_bytes.len() >= 64 { + let keypair: [u8; 64] = private_key_bytes[..64] + .try_into() + .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; + SigningKey::from_keypair_bytes(&keypair) + .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed) + } else { + let seed: [u8; 32] = private_key_bytes + .try_into() + .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; + let signing_key = SigningKey::from_bytes(&seed); + let expected_public = signing_key.verifying_key().to_bytes(); + let provided_public: [u8; 32] = public_key_bytes + .try_into() + .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; + if expected_public != provided_public { + return Err(crate::errors::AuthError::MlsKeyGenerationFailed); + } + Ok(signing_key) + } +} + +#[cfg(target_arch = "wasm32")] +fn p256_signing_key( + private_key_bytes: &[u8], + public_key_bytes: &[u8], +) -> Result { + use p256::ecdsa::SigningKey; + use p256::pkcs8::DecodePrivateKey; + + if private_key_bytes.len() == 32 { + let secret_key = p256::SecretKey::from_bytes(private_key_bytes.into()) + .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; + let signing_key = SigningKey::from(&secret_key); + let verifying_key = p256_verifying_key(public_key_bytes)?; + if *signing_key.verifying_key() != verifying_key { + return Err(crate::errors::AuthError::MlsKeyGenerationFailed); + } + Ok(signing_key) + } else { + SigningKey::from_pkcs8_der(private_key_bytes) + .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed) + } +} + +#[cfg(target_arch = "wasm32")] +fn p256_verifying_key( + public_key_bytes: &[u8], +) -> Result { + use p256::ecdsa::VerifyingKey; + use p256::elliptic_curve::sec1::FromEncodedPoint; + use p256::EncodedPoint; + + let point = EncodedPoint::from_bytes(public_key_bytes) + .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; + let public_key = p256::PublicKey::from_encoded_point(&point) + .into_option() + .ok_or(crate::errors::AuthError::MlsKeyGenerationFailed)?; + Ok(VerifyingKey::from(&public_key)) +} + /// Convert arbitrary bytes into a PEM-formatted string with the provided header/footer. /// The body is wrapped at 64 character lines per RFC 7468 guidance. /// Header/footer should include the BEGIN/END lines with trailing/leading newlines as desired. @@ -60,11 +270,37 @@ pub fn bytes_to_pem(key_bytes: &[u8], header: &str, footer: &str) -> String { mod tests { use super::*; + #[cfg(not(target_arch = "wasm32"))] + fn generate_test_ed25519_keys() -> (Vec, Vec) { + use aws_lc_rs::rand::{SecureRandom, SystemRandom}; + use aws_lc_rs::signature::{Ed25519KeyPair, KeyPair}; + + let rng = SystemRandom::new(); + let mut seed_bytes = [0u8; 32]; + rng.fill(&mut seed_bytes).expect("rng fill"); + let key_pair = + Ed25519KeyPair::from_seed_unchecked(&seed_bytes).expect("key pair from seed"); + let public = key_pair.public_key().as_ref(); + (seed_bytes.to_vec(), public.to_vec()) + } + + #[cfg(not(target_arch = "wasm32"))] #[test] fn test_sign_header_aad_with_generated_keys() { - let (secret, public) = generate_mls_signature_keys().unwrap(); + let (secret, public) = generate_test_ed25519_keys(); + let msg = b"hello"; + let sig = sign_header_aad(msg, &secret, &public).unwrap(); + verify_header_aad(msg, &sig, &public).unwrap(); + } + + #[cfg(not(target_arch = "wasm32"))] + #[test] + fn test_sign_header_aad_with_concatenated_ed25519_private_key() { + let (seed, public) = generate_test_ed25519_keys(); + let mut concatenated = seed; + concatenated.extend_from_slice(&public); let msg = b"hello"; - let sig = sign_header_aad(msg, &secret).unwrap(); + let sig = sign_header_aad(msg, &concatenated, &public).unwrap(); verify_header_aad(msg, &sig, &public).unwrap(); } diff --git a/data-plane/core/mls/src/mls.rs b/data-plane/core/mls/src/mls.rs index f7c7dd86c..f1ebba745 100644 --- a/data-plane/core/mls/src/mls.rs +++ b/data-plane/core/mls/src/mls.rs @@ -125,6 +125,13 @@ where } } + /// Identity provider whose MLS signature keys were installed during + /// [`Self::initialize`]. Distinct from session-level clones when `P` stores + /// keys per clone (e.g. [`SharedSecret`]). + pub fn identity_provider(&self) -> &P { + &self.identity_provider + } + /// Creates a signing identity from the keys stored in the identity provider. /// The provider must have had its MLS keys set via `set_signature_keys`. fn create_signing_identity( diff --git a/data-plane/core/session/src/mls_state.rs b/data-plane/core/session/src/mls_state.rs index dee14c52d..1ea456eb3 100644 --- a/data-plane/core/session/src/mls_state.rs +++ b/data-plane/core/session/src/mls_state.rs @@ -303,6 +303,7 @@ where session_id: session_header.get_session_id(), message_id: session_header.get_message_id(), payload_type, + sequence_number: slim_header.sequence_number, }; aad.encode_to_vec() diff --git a/data-plane/core/session/src/session_moderator.rs b/data-plane/core/session/src/session_moderator.rs index b859403ac..f245b6336 100644 --- a/data-plane/core/session/src/session_moderator.rs +++ b/data-plane/core/session/src/session_moderator.rs @@ -347,11 +347,12 @@ where { #[maybe_async::maybe_async] async fn encrypt_output(&mut self, output: &mut SessionOutput) -> Result<(), SessionError> { - crate::session_controller::SessionController::apply_identity_to_slim_output( - output, - &self.common.settings.identity_provider, - )?; - if self.mls_state.is_some() { + if let Some(mls_state) = &self.mls_state { + let identity_provider = mls_state.common.mls.identity_provider(); + crate::session_controller::SessionController::apply_identity_to_slim_output( + output, + identity_provider, + )?; for msg in &mut output.messages { if let OutboundMessage::ToSlim(m) = msg && m.get_session_message_type().is_command_message() @@ -360,15 +361,18 @@ where self.common.next_control_seq += 1; let aad = crate::mls_state::build_aad(m); - let private_key = self - .common - .settings - .identity_provider - .get_signature_secret_key()?; - let signature = slim_auth::utils::sign_header_aad(&aad, &private_key)?; + let private_key = identity_provider.get_signature_secret_key()?; + let public_key = identity_provider.get_signature_public_key()?; + let signature = + slim_auth::utils::sign_header_aad(&aad, &private_key, &public_key)?; m.get_slim_header_mut().e2e_header_sig = Some(signature); } } + } else { + crate::session_controller::SessionController::apply_identity_to_slim_output( + output, + &self.common.settings.identity_provider, + )?; } if let Some(mls_state) = &mut self.mls_state { mls_state.common.encrypt_output(output).await?; diff --git a/data-plane/core/session/src/session_participant.rs b/data-plane/core/session/src/session_participant.rs index 034aa276f..c14490dfc 100644 --- a/data-plane/core/session/src/session_participant.rs +++ b/data-plane/core/session/src/session_participant.rs @@ -300,11 +300,12 @@ where { #[maybe_async::maybe_async] async fn encrypt_output(&mut self, output: &mut SessionOutput) -> Result<(), SessionError> { - crate::session_controller::SessionController::apply_identity_to_slim_output( - output, - &self.common.settings.identity_provider, - )?; - if self.mls_state.is_some() { + if let Some(mls_state) = &self.mls_state { + let identity_provider = mls_state.mls.identity_provider(); + crate::session_controller::SessionController::apply_identity_to_slim_output( + output, + identity_provider, + )?; for msg in &mut output.messages { if let OutboundMessage::ToSlim(m) = msg && m.get_session_message_type().is_command_message() @@ -313,15 +314,18 @@ where self.common.next_control_seq += 1; let aad = crate::mls_state::build_aad(m); - let private_key = self - .common - .settings - .identity_provider - .get_signature_secret_key()?; - let signature = slim_auth::utils::sign_header_aad(&aad, &private_key)?; + let private_key = identity_provider.get_signature_secret_key()?; + let public_key = identity_provider.get_signature_public_key()?; + let signature = + slim_auth::utils::sign_header_aad(&aad, &private_key, &public_key)?; m.get_slim_header_mut().e2e_header_sig = Some(signature); } } + } else { + crate::session_controller::SessionController::apply_identity_to_slim_output( + output, + &self.common.settings.identity_provider, + )?; } if let Some(mls_state) = &mut self.mls_state { mls_state.encrypt_output(output).await?; From 597d2159476dfd203e724f95a61dc12235b7487a Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Tue, 16 Jun 2026 15:08:49 +0200 Subject: [PATCH 08/12] fix: fix linting Signed-off-by: Mark Marton --- data-plane/core/auth/src/utils.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/data-plane/core/auth/src/utils.rs b/data-plane/core/auth/src/utils.rs index 46eda92ca..1183d4950 100644 --- a/data-plane/core/auth/src/utils.rs +++ b/data-plane/core/auth/src/utils.rs @@ -77,7 +77,7 @@ fn sign_header_aad_p256( public_key_bytes: &[u8], ) -> Result, crate::errors::AuthError> { use aws_lc_rs::rand::SystemRandom; - use aws_lc_rs::signature::{EcdsaKeyPair, ECDSA_P256_SHA256_ASN1_SIGNING}; + use aws_lc_rs::signature::{ECDSA_P256_SHA256_ASN1_SIGNING, EcdsaKeyPair}; let key_pair = EcdsaKeyPair::from_private_key_and_public_key( &ECDSA_P256_SHA256_ASN1_SIGNING, @@ -98,8 +98,8 @@ fn sign_header_aad_p256( private_key_bytes: &[u8], public_key_bytes: &[u8], ) -> Result, crate::errors::AuthError> { - use p256::ecdsa::signature::Signer as _; use p256::ecdsa::Signature; + use p256::ecdsa::signature::Signer as _; let signing_key = p256_signing_key(private_key_bytes, public_key_bytes)?; let signature: Signature = signing_key.sign(aad_bytes); @@ -164,8 +164,8 @@ fn verify_header_aad_p256( signature_bytes: &[u8], public_key_bytes: &[u8], ) -> Result<(), crate::errors::AuthError> { - use p256::ecdsa::signature::Verifier; use p256::ecdsa::Signature; + use p256::ecdsa::signature::Verifier; let verifying_key = p256_verifying_key(public_key_bytes)?; let signature = @@ -231,9 +231,9 @@ fn p256_signing_key( fn p256_verifying_key( public_key_bytes: &[u8], ) -> Result { + use p256::EncodedPoint; use p256::ecdsa::VerifyingKey; use p256::elliptic_curve::sec1::FromEncodedPoint; - use p256::EncodedPoint; let point = EncodedPoint::from_bytes(public_key_bytes) .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; From 30985caaabef45977da98c18b01b6728a6917168 Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Wed, 17 Jun 2026 07:15:44 +0200 Subject: [PATCH 09/12] fix: remove wasm32 target condition and generalize sign/verify functions Signed-off-by: Mark Marton --- data-plane/core/auth/Cargo.toml | 4 +- data-plane/core/auth/src/utils.rs | 96 +------------------ .../core/session/src/session_controller.rs | 23 +++++ .../core/session/src/session_moderator.rs | 18 +--- .../core/session/src/session_participant.rs | 18 +--- 5 files changed, 34 insertions(+), 125 deletions(-) diff --git a/data-plane/core/auth/Cargo.toml b/data-plane/core/auth/Cargo.toml index dccae7e20..c58d84ea7 100644 --- a/data-plane/core/auth/Cargo.toml +++ b/data-plane/core/auth/Cargo.toml @@ -18,8 +18,10 @@ default = [] agntcy-slim-version = { workspace = true } base64 = { workspace = true } cfg-if = { workspace = true } +ed25519-dalek = { version = "2.1", default-features = false, features = ["rand_core"] } http = { workspace = true } itoa = "1" +p256 = { version = "0.13", default-features = false, features = ["ecdsa", "pkcs8", "std"] } parking_lot = { workspace = true } prost-types = { workspace = true } rand = { workspace = true } @@ -56,10 +58,8 @@ wiremock = { workspace = true } # native-only). `getrandom` with the wasm_js backend is required so that # `rand` works in the browser. [target.'cfg(target_arch = "wasm32")'.dependencies] -ed25519-dalek = { version = "2.1", default-features = false, features = ["rand_core"] } getrandom = { workspace = true } hmac = { workspace = true } -p256 = { version = "0.13", default-features = false, features = ["ecdsa", "pkcs8", "std"] } sha2 = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] diff --git a/data-plane/core/auth/src/utils.rs b/data-plane/core/auth/src/utils.rs index 1183d4950..2d6413a0b 100644 --- a/data-plane/core/auth/src/utils.rs +++ b/data-plane/core/auth/src/utils.rs @@ -34,31 +34,6 @@ pub fn verify_header_aad( } } -#[cfg(not(target_arch = "wasm32"))] -fn sign_header_aad_ed25519( - aad_bytes: &[u8], - private_key_bytes: &[u8], - public_key_bytes: &[u8], -) -> Result, crate::errors::AuthError> { - use aws_lc_rs::signature::Ed25519KeyPair; - - // MLS may store Ed25519 secrets either as a 32-byte seed with the public key - // kept separately, or as a 64-byte seed||public concatenation (see - // mls-rs-crypto-awslc x509 import paths). - let key_pair = if private_key_bytes.len() >= 64 { - Ed25519KeyPair::from_seed_and_public_key( - &private_key_bytes[..32], - &private_key_bytes[32..64], - ) - } else { - Ed25519KeyPair::from_seed_and_public_key(private_key_bytes, public_key_bytes) - } - .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; - let signature = key_pair.sign(aad_bytes); - Ok(signature.as_ref().to_vec()) -} - -#[cfg(target_arch = "wasm32")] fn sign_header_aad_ed25519( aad_bytes: &[u8], private_key_bytes: &[u8], @@ -70,29 +45,6 @@ fn sign_header_aad_ed25519( Ok(signing_key.sign(aad_bytes).to_bytes().to_vec()) } -#[cfg(not(target_arch = "wasm32"))] -fn sign_header_aad_p256( - aad_bytes: &[u8], - private_key_bytes: &[u8], - public_key_bytes: &[u8], -) -> Result, crate::errors::AuthError> { - use aws_lc_rs::rand::SystemRandom; - use aws_lc_rs::signature::{ECDSA_P256_SHA256_ASN1_SIGNING, EcdsaKeyPair}; - - let key_pair = EcdsaKeyPair::from_private_key_and_public_key( - &ECDSA_P256_SHA256_ASN1_SIGNING, - private_key_bytes, - public_key_bytes, - ) - .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; - let rng = SystemRandom::new(); - let signature = key_pair - .sign(&rng, aad_bytes) - .map_err(|_| crate::errors::AuthError::MlsKeyGenerationFailed)?; - Ok(signature.as_ref().to_vec()) -} - -#[cfg(target_arch = "wasm32")] fn sign_header_aad_p256( aad_bytes: &[u8], private_key_bytes: &[u8], @@ -106,21 +58,6 @@ fn sign_header_aad_p256( Ok(signature.to_der().as_bytes().to_vec()) } -#[cfg(not(target_arch = "wasm32"))] -fn verify_header_aad_ed25519( - aad_bytes: &[u8], - signature_bytes: &[u8], - public_key_bytes: &[u8], -) -> Result<(), crate::errors::AuthError> { - use aws_lc_rs::signature::{ED25519, UnparsedPublicKey}; - - let public_key = UnparsedPublicKey::new(&ED25519, public_key_bytes); - public_key - .verify(aad_bytes, signature_bytes) - .map_err(|_| crate::errors::AuthError::TokenInvalid) -} - -#[cfg(target_arch = "wasm32")] fn verify_header_aad_ed25519( aad_bytes: &[u8], signature_bytes: &[u8], @@ -144,21 +81,6 @@ fn verify_header_aad_ed25519( .map_err(|_| crate::errors::AuthError::TokenInvalid) } -#[cfg(not(target_arch = "wasm32"))] -fn verify_header_aad_p256( - aad_bytes: &[u8], - signature_bytes: &[u8], - public_key_bytes: &[u8], -) -> Result<(), crate::errors::AuthError> { - use aws_lc_rs::signature::{ECDSA_P256_SHA256_ASN1, UnparsedPublicKey}; - - let public_key = UnparsedPublicKey::new(&ECDSA_P256_SHA256_ASN1, public_key_bytes); - public_key - .verify(aad_bytes, signature_bytes) - .map_err(|_| crate::errors::AuthError::TokenInvalid) -} - -#[cfg(target_arch = "wasm32")] fn verify_header_aad_p256( aad_bytes: &[u8], signature_bytes: &[u8], @@ -175,7 +97,6 @@ fn verify_header_aad_p256( .map_err(|_| crate::errors::AuthError::TokenInvalid) } -#[cfg(target_arch = "wasm32")] fn ed25519_signing_key( private_key_bytes: &[u8], public_key_bytes: &[u8], @@ -204,7 +125,6 @@ fn ed25519_signing_key( } } -#[cfg(target_arch = "wasm32")] fn p256_signing_key( private_key_bytes: &[u8], public_key_bytes: &[u8], @@ -227,7 +147,6 @@ fn p256_signing_key( } } -#[cfg(target_arch = "wasm32")] fn p256_verifying_key( public_key_bytes: &[u8], ) -> Result { @@ -270,21 +189,17 @@ pub fn bytes_to_pem(key_bytes: &[u8], header: &str, footer: &str) -> String { mod tests { use super::*; - #[cfg(not(target_arch = "wasm32"))] fn generate_test_ed25519_keys() -> (Vec, Vec) { - use aws_lc_rs::rand::{SecureRandom, SystemRandom}; - use aws_lc_rs::signature::{Ed25519KeyPair, KeyPair}; + use ed25519_dalek::SigningKey; + use rand::Rng; - let rng = SystemRandom::new(); let mut seed_bytes = [0u8; 32]; - rng.fill(&mut seed_bytes).expect("rng fill"); - let key_pair = - Ed25519KeyPair::from_seed_unchecked(&seed_bytes).expect("key pair from seed"); - let public = key_pair.public_key().as_ref(); + rand::rng().fill(&mut seed_bytes); + let signing_key = SigningKey::from_bytes(&seed_bytes); + let public = signing_key.verifying_key().to_bytes(); (seed_bytes.to_vec(), public.to_vec()) } - #[cfg(not(target_arch = "wasm32"))] #[test] fn test_sign_header_aad_with_generated_keys() { let (secret, public) = generate_test_ed25519_keys(); @@ -293,7 +208,6 @@ mod tests { verify_header_aad(msg, &sig, &public).unwrap(); } - #[cfg(not(target_arch = "wasm32"))] #[test] fn test_sign_header_aad_with_concatenated_ed25519_private_key() { let (seed, public) = generate_test_ed25519_keys(); diff --git a/data-plane/core/session/src/session_controller.rs b/data-plane/core/session/src/session_controller.rs index ec8043830..47edb4804 100644 --- a/data-plane/core/session/src/session_controller.rs +++ b/data-plane/core/session/src/session_controller.rs @@ -773,6 +773,29 @@ where } } + pub(crate) fn sign_control_messages( + &mut self, + output: &mut SessionOutput, + identity_provider: &P, + ) -> Result<(), SessionError> { + for msg in &mut output.messages { + if let OutboundMessage::ToSlim(m) = msg + && m.get_session_message_type().is_command_message() + { + m.get_slim_header_mut().sequence_number = Some(self.next_control_seq); + self.next_control_seq += 1; + + let aad = crate::mls_state::build_aad(m); + let private_key = identity_provider.get_signature_secret_key()?; + let public_key = identity_provider.get_signature_public_key()?; + let signature = + slim_auth::utils::sign_header_aad(&aad, &private_key, &public_key)?; + m.get_slim_header_mut().e2e_header_sig = Some(signature); + } + } + Ok(()) + } + /// Send control message through ControllerSender, returning the output. pub(crate) fn send_with_timer( &mut self, diff --git a/data-plane/core/session/src/session_moderator.rs b/data-plane/core/session/src/session_moderator.rs index f245b6336..3c170b6b6 100644 --- a/data-plane/core/session/src/session_moderator.rs +++ b/data-plane/core/session/src/session_moderator.rs @@ -22,7 +22,7 @@ use tokio::sync::oneshot; use tracing::debug; use crate::{ - common::{MessageDirection, OutboundMessage, SessionMessage, SessionOutput}, + common::{MessageDirection, SessionMessage, SessionOutput}, errors::SessionError, mls_state::{MlsModeratorState, MlsState}, moderator_task::{ @@ -353,21 +353,7 @@ where output, identity_provider, )?; - for msg in &mut output.messages { - if let OutboundMessage::ToSlim(m) = msg - && m.get_session_message_type().is_command_message() - { - m.get_slim_header_mut().sequence_number = Some(self.common.next_control_seq); - self.common.next_control_seq += 1; - - let aad = crate::mls_state::build_aad(m); - let private_key = identity_provider.get_signature_secret_key()?; - let public_key = identity_provider.get_signature_public_key()?; - let signature = - slim_auth::utils::sign_header_aad(&aad, &private_key, &public_key)?; - m.get_slim_header_mut().e2e_header_sig = Some(signature); - } - } + self.common.sign_control_messages(output, identity_provider)?; } else { crate::session_controller::SessionController::apply_identity_to_slim_output( output, diff --git a/data-plane/core/session/src/session_participant.rs b/data-plane/core/session/src/session_participant.rs index c14490dfc..3e9e6b01c 100644 --- a/data-plane/core/session/src/session_participant.rs +++ b/data-plane/core/session/src/session_participant.rs @@ -16,7 +16,7 @@ use slim_mls::mls::Mls; use tracing::debug; use crate::{ - common::{MessageDirection, OutboundMessage, SessionMessage, SessionOutput}, + common::{MessageDirection, SessionMessage, SessionOutput}, errors::SessionError, mls_state::MlsState, runtime::maybe_await, @@ -306,21 +306,7 @@ where output, identity_provider, )?; - for msg in &mut output.messages { - if let OutboundMessage::ToSlim(m) = msg - && m.get_session_message_type().is_command_message() - { - m.get_slim_header_mut().sequence_number = Some(self.common.next_control_seq); - self.common.next_control_seq += 1; - - let aad = crate::mls_state::build_aad(m); - let private_key = identity_provider.get_signature_secret_key()?; - let public_key = identity_provider.get_signature_public_key()?; - let signature = - slim_auth::utils::sign_header_aad(&aad, &private_key, &public_key)?; - m.get_slim_header_mut().e2e_header_sig = Some(signature); - } - } + self.common.sign_control_messages(output, identity_provider)?; } else { crate::session_controller::SessionController::apply_identity_to_slim_output( output, From a8df0b05b01490bff65743d05e5616d2bdaefa3c Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Wed, 17 Jun 2026 08:45:35 +0200 Subject: [PATCH 10/12] fix: fix linting Signed-off-by: Mark Marton --- data-plane/core/session/src/session_controller.rs | 3 +-- data-plane/core/session/src/session_moderator.rs | 3 ++- data-plane/core/session/src/session_participant.rs | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/data-plane/core/session/src/session_controller.rs b/data-plane/core/session/src/session_controller.rs index 47edb4804..4942b6b4c 100644 --- a/data-plane/core/session/src/session_controller.rs +++ b/data-plane/core/session/src/session_controller.rs @@ -788,8 +788,7 @@ where let aad = crate::mls_state::build_aad(m); let private_key = identity_provider.get_signature_secret_key()?; let public_key = identity_provider.get_signature_public_key()?; - let signature = - slim_auth::utils::sign_header_aad(&aad, &private_key, &public_key)?; + let signature = slim_auth::utils::sign_header_aad(&aad, &private_key, &public_key)?; m.get_slim_header_mut().e2e_header_sig = Some(signature); } } diff --git a/data-plane/core/session/src/session_moderator.rs b/data-plane/core/session/src/session_moderator.rs index 3c170b6b6..230031786 100644 --- a/data-plane/core/session/src/session_moderator.rs +++ b/data-plane/core/session/src/session_moderator.rs @@ -353,7 +353,8 @@ where output, identity_provider, )?; - self.common.sign_control_messages(output, identity_provider)?; + self.common + .sign_control_messages(output, identity_provider)?; } else { crate::session_controller::SessionController::apply_identity_to_slim_output( output, diff --git a/data-plane/core/session/src/session_participant.rs b/data-plane/core/session/src/session_participant.rs index 3e9e6b01c..c00562a4e 100644 --- a/data-plane/core/session/src/session_participant.rs +++ b/data-plane/core/session/src/session_participant.rs @@ -306,7 +306,8 @@ where output, identity_provider, )?; - self.common.sign_control_messages(output, identity_provider)?; + self.common + .sign_control_messages(output, identity_provider)?; } else { crate::session_controller::SessionController::apply_identity_to_slim_output( output, From 706cc830d2acabfa8fe01c0f3fc4a85ef1449ad8 Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Wed, 17 Jun 2026 09:17:38 +0200 Subject: [PATCH 11/12] fix: increase coverage Signed-off-by: Mark Marton --- data-plane/core/auth/src/utils.rs | 153 ++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) diff --git a/data-plane/core/auth/src/utils.rs b/data-plane/core/auth/src/utils.rs index 2d6413a0b..a95ec3c78 100644 --- a/data-plane/core/auth/src/utils.rs +++ b/data-plane/core/auth/src/utils.rs @@ -200,6 +200,19 @@ mod tests { (seed_bytes.to_vec(), public.to_vec()) } + fn generate_test_p256_keys() -> (Vec, Vec, Vec) { + use p256::SecretKey; + use p256::elliptic_curve::rand_core::OsRng; + + let secret_key = SecretKey::random(&mut OsRng); + let secret_bytes = secret_key.to_bytes().to_vec(); + let signing_key = p256::ecdsa::SigningKey::from(&secret_key); + let verifying_key = signing_key.verifying_key(); + let public_compressed = verifying_key.to_encoded_point(true).as_bytes().to_vec(); + let public_uncompressed = verifying_key.to_encoded_point(false).as_bytes().to_vec(); + (secret_bytes, public_compressed, public_uncompressed) + } + #[test] fn test_sign_header_aad_with_generated_keys() { let (secret, public) = generate_test_ed25519_keys(); @@ -218,6 +231,146 @@ mod tests { verify_header_aad(msg, &sig, &public).unwrap(); } + #[test] + fn test_sign_verify_p256_compressed() { + let (secret, public_comp, _) = generate_test_p256_keys(); + let msg = b"hello p256 compressed"; + let sig = sign_header_aad(msg, &secret, &public_comp).unwrap(); + verify_header_aad(msg, &sig, &public_comp).unwrap(); + } + + #[test] + fn test_sign_verify_p256_uncompressed() { + let (secret, _, public_uncomp) = generate_test_p256_keys(); + let msg = b"hello p256 uncompressed"; + let sig = sign_header_aad(msg, &secret, &public_uncomp).unwrap(); + verify_header_aad(msg, &sig, &public_uncomp).unwrap(); + } + + #[test] + fn test_sign_verify_p256_pkcs8() { + use p256::SecretKey; + use p256::elliptic_curve::rand_core::OsRng; + use p256::pkcs8::EncodePrivateKey; + + let secret_key = SecretKey::random(&mut OsRng); + let pkcs8_der = secret_key.to_pkcs8_der().unwrap().to_bytes().to_vec(); + + let signing_key = p256::ecdsa::SigningKey::from(&secret_key); + let verifying_key = signing_key.verifying_key(); + let public_comp = verifying_key.to_encoded_point(true).as_bytes().to_vec(); + + let msg = b"hello p256 pkcs8"; + let sig = sign_header_aad(msg, &pkcs8_der, &public_comp).unwrap(); + verify_header_aad(msg, &sig, &public_comp).unwrap(); + } + + #[test] + fn test_invalid_key_lengths() { + let msg = b"invalid keys"; + // Invalid public key length + let res_sign = sign_header_aad(msg, &[0; 32], &[0; 10]); + assert!(matches!( + res_sign, + Err(crate::errors::AuthError::MlsKeyGenerationFailed) + )); + + let res_verify = verify_header_aad(msg, &[0; 64], &[0; 10]); + assert!(matches!( + res_verify, + Err(crate::errors::AuthError::TokenInvalid) + )); + } + + #[test] + fn test_ed25519_key_errors() { + let (secret, public) = generate_test_ed25519_keys(); + let msg = b"ed25519 error tests"; + + // Mismatched public key + let mut wrong_public = public.clone(); + wrong_public[0] ^= 0xFF; + let res_sign = sign_header_aad(msg, &secret, &wrong_public); + assert!(matches!( + res_sign, + Err(crate::errors::AuthError::MlsKeyGenerationFailed) + )); + + // Invalid private key length (neither 32 nor >=64) + let res_sign_len = sign_header_aad(msg, &[0; 16], &public); + assert!(matches!( + res_sign_len, + Err(crate::errors::AuthError::MlsKeyGenerationFailed) + )); + + // Invalid signature verification + let sig = sign_header_aad(msg, &secret, &public).unwrap(); + let mut bad_sig = sig.clone(); + bad_sig[0] ^= 0xFF; + let res_verify = verify_header_aad(msg, &bad_sig, &public); + assert!(matches!( + res_verify, + Err(crate::errors::AuthError::TokenInvalid) + )); + + // Signature too short + let res_verify_short = verify_header_aad(msg, &sig[..10], &public); + assert!(matches!( + res_verify_short, + Err(crate::errors::AuthError::TokenInvalid) + )); + } + + #[test] + fn test_p256_key_errors() { + let (secret, public_comp, _) = generate_test_p256_keys(); + let msg = b"p256 error tests"; + + // Mismatched public key + let mut wrong_public = public_comp.clone(); + // Modify to make it a valid but different public key point if possible, or just invalid/mismatched. + wrong_public[1] ^= 0xFF; + let res_sign = sign_header_aad(msg, &secret, &wrong_public); + assert!(matches!( + res_sign, + Err(crate::errors::AuthError::MlsKeyGenerationFailed) + )); + + // Valid point, but mismatched public key + let (_, other_public, _) = generate_test_p256_keys(); + let res_sign_mismatch = sign_header_aad(msg, &secret, &other_public); + assert!(matches!( + res_sign_mismatch, + Err(crate::errors::AuthError::MlsKeyGenerationFailed) + )); + + // Invalid private key bytes (32 bytes but all zeros - not a valid scalar) + let res_sign_zero = sign_header_aad(msg, &[0; 32], &public_comp); + assert!(matches!( + res_sign_zero, + Err(crate::errors::AuthError::MlsKeyGenerationFailed) + )); + + // Invalid PKCS8 DER private key + let res_sign_der = sign_header_aad(msg, &[0; 40], &public_comp); + assert!(matches!( + res_sign_der, + Err(crate::errors::AuthError::MlsKeyGenerationFailed) + )); + + // Invalid signature format for verification + let sig = sign_header_aad(msg, &secret, &public_comp).unwrap(); + let mut bad_sig = sig.clone(); + if !bad_sig.is_empty() { + bad_sig[0] ^= 0xFF; + } + let res_verify = verify_header_aad(msg, &bad_sig, &public_comp); + assert!(matches!( + res_verify, + Err(crate::errors::AuthError::TokenInvalid) + )); + } + #[test] fn test_bytes_to_pem_basic() { let data = b"hello world"; // base64: aGVsbG8gd29ybGQ= From 3633e53a69195f79e1c711633c168d07f0a36acd Mon Sep 17 00:00:00 2001 From: Mark Marton Date: Wed, 17 Jun 2026 09:53:47 +0200 Subject: [PATCH 12/12] fix: fix multicast test race condition Signed-off-by: Mark Marton --- data-plane/core/service/src/app.rs | 44 ++++++++++++++---------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/data-plane/core/service/src/app.rs b/data-plane/core/service/src/app.rs index 0ca632406..72fdefe7e 100644 --- a/data-plane/core/service/src/app.rs +++ b/data-plane/core/service/src/app.rs @@ -1194,34 +1194,30 @@ mod tests { .await .unwrap(); - // Give some time for messages to be processed - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - - // Collect received session notifications from all participants + // Wait for each participant to receive its session notification. let mut total_received_sessions = 0; for (i, mut notifications) in participant_notifications.into_iter().enumerate() { - let mut participant_sessions = Vec::new(); - - while let Ok(notification) = notifications.try_recv() { - match notification.unwrap() { - slim_session::notification::Notification::NewSession(session_ctx) => { - participant_sessions.push(session_ctx); + let received_session = + tokio::time::timeout(std::time::Duration::from_secs(10), async { + loop { + match notifications.recv().await { + Some(Ok(slim_session::notification::Notification::NewSession( + session_ctx, + ))) => return session_ctx, + Some(Ok(_)) => continue, + Some(Err(e)) => { + panic!("Participant {i} notification error: {e}"); + } + None => panic!("Participant {i} notification channel closed"), + } } - _ => continue, - } - } - - // Each participant should receive exactly one session notification - assert_eq!( - participant_sessions.len(), - 1, - "Participant {} should receive exactly 1 session", - i - ); + }) + .await + .unwrap_or_else(|_| { + panic!("timeout waiting for session notification for participant {i}") + }); - // Verify session information for this participant - let received_session = &participant_sessions[0]; let session_arc = received_session.session_arc().unwrap(); // Verify it's a multicast session @@ -1232,7 +1228,7 @@ mod tests { let expected_dst = channel_name.clone().with_id(NameId::DATA_CHANNEL_ID); assert_eq!(dst, &expected_dst); - total_received_sessions += participant_sessions.len(); + total_received_sessions += 1; } // Verify total number of session notifications matches number of participants