diff --git a/data-plane/Cargo.lock b/data-plane/Cargo.lock index 1a7c1fe2e..fa014c74c 100644 --- a/data-plane/Cargo.lock +++ b/data-plane/Cargo.lock @@ -216,6 +216,7 @@ dependencies = [ "semver", "serde", "serde_json", + "stats_alloc", "thiserror 2.0.18", "tokio", "tokio-stream", @@ -3654,6 +3655,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stats_alloc" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c0e04424e733e69714ca1bbb9204c1a57f09f5493439520f9f68c132ad25eec" + [[package]] name = "strsim" version = "0.11.1" diff --git a/data-plane/Cargo.toml b/data-plane/Cargo.toml index 11194007d..a02381404 100644 --- a/data-plane/Cargo.toml +++ b/data-plane/Cargo.toml @@ -118,6 +118,7 @@ serde-pyobject = "0.7" serde_json = "1.0" serde_yaml = "0.9.34" spiffe = { version = "0.12.0", features = ["x509-source", "jwt-source", "jwt-verify-aws-lc-rs"] } +stats_alloc = "0.1.10" tempfile = "3" test-fork = "0.1.3" thiserror = "2.0.9" diff --git a/data-plane/Taskfile.yaml b/data-plane/Taskfile.yaml index d72cd3c48..d24c78768 100644 --- a/data-plane/Taskfile.yaml +++ b/data-plane/Taskfile.yaml @@ -129,6 +129,14 @@ tasks: cmds: - git describe --tags --match "slim-v*" | cut -d 'v' -f 2 + data-plane:bench:process-stream: + desc: "Run process_stream Criterion bench and allocation-counter bench" + cmds: + - cargo bench -p agntcy-slim-datapath --bench process_stream_benchmark {{.ARGS}} + - cargo bench -p agntcy-slim-datapath --bench process_stream_alloc {{.ARGS}} + vars: + ARGS: '{{.ARGS | default ""}}' + data-plane:generate:grpc-json-schema: desc: "Generate the gRPC schema" cmds: diff --git a/data-plane/core/datapath/Cargo.toml b/data-plane/core/datapath/Cargo.toml index 40db7061a..1778694fa 100644 --- a/data-plane/core/datapath/Cargo.toml +++ b/data-plane/core/datapath/Cargo.toml @@ -43,6 +43,7 @@ tonic-prost-build = { workspace = true } [dev-dependencies] criterion = { workspace = true } +stats_alloc = { workspace = true } tokio = { workspace = true, features = ["test-util"] } tracing-test = { workspace = true, features = ["no-env-filter"] } @@ -50,6 +51,10 @@ tracing-test = { workspace = true, features = ["no-env-filter"] } name = "pool_benchmark" harness = false +[[bench]] +name = "process_stream_benchmark" +harness = false + [[bench]] name = "name_benchmark" harness = false diff --git a/data-plane/core/datapath/benches/process_stream_alloc.rs b/data-plane/core/datapath/benches/process_stream_alloc.rs new file mode 100644 index 000000000..aa9847c37 --- /dev/null +++ b/data-plane/core/datapath/benches/process_stream_alloc.rs @@ -0,0 +1,53 @@ +// Copyright AGNTCY Contributors (https://github.com/agntcy) +// SPDX-License-Identifier: Apache-2.0 + +//! Allocation counters for one `bench_process_stream` iteration (CP mirror path), using +//! [`stats_alloc`](https://docs.rs/stats_alloc). + +mod process_stream_common; + +use std::alloc::System; + +use stats_alloc::{INSTRUMENTED_SYSTEM, Region, StatsAlloc}; + +#[global_allocator] +static GLOBAL: &StatsAlloc = &INSTRUMENTED_SYSTEM; + +fn main() { + let rt = process_stream_common::runtime(); + rt.block_on(process_stream_common::assert_cp_mirror_rebuild_path()); + + const WARMUP: usize = 3; + for _ in 0..WARMUP { + rt.block_on(process_stream_common::one_iteration_cp_mirror()); + } + + const SAMPLES: usize = 50; + let mut allocation_counts = Vec::with_capacity(SAMPLES); + let mut bytes_allocated = Vec::with_capacity(SAMPLES); + let mut deallocations = Vec::with_capacity(SAMPLES); + for _ in 0..SAMPLES { + let region = Region::new(GLOBAL); + rt.block_on(process_stream_common::one_iteration_cp_mirror()); + let s = region.change(); + allocation_counts.push(s.allocations); + bytes_allocated.push(s.bytes_allocated); + deallocations.push(s.deallocations); + } + + fn summarize(values: &[usize]) -> (usize, usize, f64) { + let min = *values.iter().min().unwrap_or(&0); + let max = *values.iter().max().unwrap_or(&0); + let mean = values.iter().sum::() as f64 / values.len().max(1) as f64; + (min, max, mean) + } + + let (a_min, a_max, a_mean) = summarize(&allocation_counts); + let (b_min, b_max, b_mean) = summarize(&bytes_allocated); + let (d_min, d_max, d_mean) = summarize(&deallocations); + + println!( + "\nprocess_stream_alloc: {} samples (one CP-mirror iteration each)\n allocations: min={} max={} mean={:.1}\n bytes_allocated (gross): min={} max={} mean={:.1}\n deallocations: min={} max={} mean={:.1}", + SAMPLES, a_min, a_max, a_mean, b_min, b_max, b_mean, d_min, d_max, d_mean + ); +} diff --git a/data-plane/core/datapath/benches/process_stream_benchmark.rs b/data-plane/core/datapath/benches/process_stream_benchmark.rs new file mode 100644 index 000000000..87ca8a551 --- /dev/null +++ b/data-plane/core/datapath/benches/process_stream_benchmark.rs @@ -0,0 +1,31 @@ +// Copyright AGNTCY Contributors (https://github.com/agntcy) +// SPDX-License-Identifier: Apache-2.0 + +//! Measures [`MessageProcessor::bench_process_stream`] when remote subscribe/unsubscribe is mirrored +//! to the control plane via [`ProtoMessage::rebuild_header_for_control_plane`] (compact header + +//! `recv_from` = dataplane connection index), with `unwrap_or_else(|| msg.clone())` only as fallback. +//! +//! Setup: `is_local = false`, `from_control_plane = false`, mock CP channel from +//! [`MessageProcessor::bench_set_control_plane_tx`]. +//! +//! For heap allocation counters over the same workload, run +//! `cargo bench -p agntcy-slim-datapath --bench process_stream_alloc` or +//! `task data-plane:bench:process-stream`. + +mod process_stream_common; + +use criterion::{Criterion, black_box, criterion_group, criterion_main}; + +fn bench_process_stream_cp_mirror_rebuild(c: &mut Criterion) { + let rt = process_stream_common::runtime(); + rt.block_on(process_stream_common::assert_cp_mirror_rebuild_path()); + + c.bench_function("process_stream: CP mirror (rebuild_header)", |b| { + b.iter(|| { + rt.block_on(black_box(process_stream_common::one_iteration_cp_mirror())); + }); + }); +} + +criterion_group!(benches, bench_process_stream_cp_mirror_rebuild); +criterion_main!(benches); diff --git a/data-plane/core/datapath/benches/process_stream_common.rs b/data-plane/core/datapath/benches/process_stream_common.rs new file mode 100644 index 000000000..c1a77ea95 --- /dev/null +++ b/data-plane/core/datapath/benches/process_stream_common.rs @@ -0,0 +1,105 @@ +// Copyright AGNTCY Contributors (https://github.com/agntcy) +// SPDX-License-Identifier: Apache-2.0 + +//! Shared setup for `process_stream` benchmarks (Criterion timing and allocation harness). + +use std::sync::OnceLock; +use std::time::Duration; + +use slim_datapath::api::{ProtoMessage, ProtoName}; +use slim_datapath::message_processing::MessageProcessor; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::sync::CancellationToken; + +pub fn runtime() -> &'static Runtime { + static RT: OnceLock = OnceLock::new(); + RT.get_or_init(|| Runtime::new().expect("tokio runtime")) +} + +pub fn bench_destination() -> ProtoName { + ProtoName::from_strings(["org", "ns", "bench-dst"]).with_id(2) +} + +/// Inbound subscribe from a remote peer (valid SLIM header so rebuild returns `Some`). +pub fn make_remote_subscribe_for_cp_mirror() -> ProtoMessage { + let source = ProtoName::from_strings(["org", "ns", "bench-src"]).with_id(1); + ProtoMessage::builder() + .source(source) + .destination(bench_destination()) + .subscription_id(42) + .build_subscribe() + .expect("valid subscribe for bench") +} + +pub async fn assert_cp_mirror_rebuild_path() { + let processor = MessageProcessor::new(); + let (cp_tx, mut cp_rx) = mpsc::channel(8); + processor.bench_set_control_plane_tx(cp_tx); + let conn_id = processor.bench_register_remote_connection(); + let (in_tx, in_rx) = mpsc::channel(4); + let cancel = CancellationToken::new(); + let handle = processor + .bench_process_stream( + ReceiverStream::new(in_rx), + conn_id, + None, + cancel.clone(), + false, + false, + ) + .expect("bench_process_stream"); + + let msg = make_remote_subscribe_for_cp_mirror(); + let expected_dst = msg.get_dst(); + in_tx.send(Ok(msg)).await.expect("send inbound"); + let mirrored = cp_rx + .recv() + .await + .expect("control plane should receive message") + .expect("mirror Ok"); + + assert_eq!( + mirrored.get_dst(), + expected_dst, + "mirrored message must carry subscription destination for the control plane" + ); + assert!( + mirrored.get_subscription_id().is_some(), + "mirrored subscribe should carry subscription_id" + ); + + cancel.cancel(); + match tokio::time::timeout(Duration::from_secs(2), handle).await { + Ok(Ok(())) => {} + Ok(Err(e)) => panic!("bench_process_stream task failed: {e}"), + Err(_) => panic!("join timeout"), + } +} + +pub async fn one_iteration_cp_mirror() { + let processor = MessageProcessor::new(); + let (cp_tx, mut cp_rx) = mpsc::channel(8); + processor.bench_set_control_plane_tx(cp_tx); + let conn_id = processor.bench_register_remote_connection(); + let (in_tx, in_rx) = mpsc::channel(4); + let cancel = CancellationToken::new(); + let handle = processor + .bench_process_stream( + ReceiverStream::new(in_rx), + conn_id, + None, + cancel.clone(), + false, + false, + ) + .expect("bench_process_stream"); + + let msg = make_remote_subscribe_for_cp_mirror(); + in_tx.send(Ok(msg)).await.expect("send inbound"); + let _ = cp_rx.recv().await; + cancel.cancel(); + let _join: Result, tokio::time::error::Elapsed> = + tokio::time::timeout(Duration::from_secs(2), handle).await; +} diff --git a/data-plane/core/datapath/src/message_processing.rs b/data-plane/core/datapath/src/message_processing.rs index 4d6f938aa..aaa2f0ffd 100644 --- a/data-plane/core/datapath/src/message_processing.rs +++ b/data-plane/core/datapath/src/message_processing.rs @@ -152,6 +152,24 @@ impl MessageProcessor { *tx_guard = Some(tx); } + /// Test/benchmark hook: installs the sender returned by [`Self::get_tx_control_plane`] so remote + /// `process_stream` loops mirror traffic without a real control-plane registration. + #[doc(hidden)] + pub fn bench_set_control_plane_tx(&self, tx: Sender>) { + self.set_tx_control_plane(tx); + } + + /// Test/benchmark hook: inserts a remote connection (server channel) suitable for driving + /// [`Self::process_stream`] with `is_local = false`. + #[doc(hidden)] + pub fn bench_register_remote_connection(&self) -> u64 { + let (tx, _rx) = mpsc::channel(16); + let connection = Connection::new(ConnectionType::Remote, Channel::Server(tx)); + self.forwarder() + .on_connection_established(connection, None) + .expect("bench remote connection") + } + fn get_tx_control_plane(&self) -> Option>> { let tx_guard = self.internal.tx_control_plane.read(); tx_guard.clone() @@ -1099,9 +1117,11 @@ impl MessageProcessor { match msg.get_type() { PublishType(_) | LinkType(_) | SubscriptionAckType(_) => {/* do nothing */} _ => { - // send subscriptions and unsubscriptions - // to the control plane - let _ = txcp.send(Ok(msg.clone())).await; + // Control plane only needs SLIM header + subscription id for + // subscribe/unsubscribe; avoid cloning metadata / full wire message. + let cp_msg = msg + .rebuild_header_for_control_plane(); + let _ = txcp.send(Ok(cp_msg.unwrap())).await; } } } @@ -1251,6 +1271,27 @@ impl MessageProcessor { Ok(handle) } + /// Benchmark/test wrapper around [`Self::process_stream`]; production code keeps using the private method. + #[doc(hidden)] + pub fn bench_process_stream( + &self, + stream: impl Stream> + Unpin + Send + 'static, + conn_index: u64, + client_config: Option, + cancellation_token: CancellationToken, + is_local: bool, + from_control_plane: bool, + ) -> Result, DataPathError> { + self.process_stream( + stream, + conn_index, + client_config, + cancellation_token, + is_local, + from_control_plane, + ) + } + fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { let mut err: &(dyn std::error::Error + 'static) = err_status; diff --git a/data-plane/core/datapath/src/messages/utils.rs b/data-plane/core/datapath/src/messages/utils.rs index 9c5e806b4..78d3fef01 100644 --- a/data-plane/core/datapath/src/messages/utils.rs +++ b/data-plane/core/datapath/src/messages/utils.rs @@ -203,9 +203,19 @@ impl SlimHeader { flags: Option, ) -> Self { let flags = flags.unwrap_or_default(); + + // When source and destination are the same `&ProtoName` (same pointer), build one ProtoName + // and clone it. Avoids a second `ProtoName::from` that would re-clone all string fields. + let (source, destination) = if std::ptr::eq(&source, &destination) { + let endpoint = source; + (Some(endpoint.clone()), Some(endpoint)) + } else { + (Some(source), Some(destination)) + }; + Self { - source: Some(source), - destination: Some(destination), + source, + destination, identity: identity.to_string(), fanout: flags.fanout, recv_from: flags.recv_from, @@ -876,6 +886,33 @@ impl ProtoMessage { } } + /// Compact subscribe/unsubscribe for forwarding remote dataplane traffic to the control plane. + /// + /// Copies destination, subscription id, and sets SLIM header `recv_from` to + /// `remote_connection_id` (the dataplane connection index the message arrived on). Omits other + /// header fields and the metadata map where possible. + /// + /// Returns [`None`] for non-subscribe/unsubscribe types or if the header or destination is missing. + pub fn rebuild_header_for_control_plane(&self) -> Option { + let dst = self.try_get_slim_header()?.destination.clone()?; + match &self.message_type { + Some(ProtoSubscribeType(s)) => { + let mut sub = ProtoSubscribe::new(dst.clone(), dst.clone(), None, None); + sub.subscription_id = s.subscription_id; + Some(ProtoMessage::new(HashMap::new(), ProtoSubscribeType(sub))) + } + Some(ProtoUnsubscribeType(u)) => { + let mut unsub = ProtoUnsubscribe::new(dst.clone(), dst.clone(), None, None); + unsub.subscription_id = u.subscription_id; + Some(ProtoMessage::new( + HashMap::new(), + ProtoUnsubscribeType(unsub), + )) + } + _ => None, + } + } + /// Extracts the command payload from the message. /// /// # Errors @@ -1638,7 +1675,7 @@ impl ProtoMessage { #[cfg(test)] mod tests { - use crate::api::proto::dataplane::v1::SessionMessageType; + use std::collections::HashMap; use super::*; @@ -1785,6 +1822,132 @@ mod tests { ); } + #[test] + fn rebuild_header_for_control_plane_subscribe() { + let source = ProtoName::from_strings(["org", "ns", "app"]).with_id(10); + let dst = ProtoName::from_strings(["org", "ns", "topic"]).with_id(20); + let sub_id = 0xfeed_beef_u64; + let msg = ProtoMessage::builder() + .source(source.clone()) + .destination(dst.clone()) + .subscription_id(sub_id) + .metadata("cp_should_drop", "yes") + .build_subscribe() + .unwrap(); + + let rebuilt = msg + .rebuild_header_for_control_plane() + .expect("rebuilt message"); + assert!(rebuilt.get_metadata_map().is_empty()); + assert!(rebuilt.is_subscribe()); + assert!(!rebuilt.is_unsubscribe()); + assert_eq!(rebuilt.get_subscription_id(), Some(sub_id)); + assert_eq!(rebuilt.get_source(), dst); + assert_eq!(rebuilt.get_dst(), dst); + assert_eq!(msg.get_source(), source); + assert_eq!(msg.get_dst(), dst); + } + + #[test] + fn rebuild_header_for_control_plane_unsubscribe() { + let source = ProtoName::from_strings(["org", "ns", "app"]).with_id(1); + let dst = ProtoName::from_strings(["org", "ns", "chan"]).with_id(2); + let sub_id = 42_u64; + let msg = ProtoMessage::builder() + .source(source) + .destination(dst.clone()) + .subscription_id(sub_id) + .metadata("noise", "data") + .build_unsubscribe() + .unwrap(); + + let rebuilt = msg + .rebuild_header_for_control_plane() + .expect("rebuilt message"); + assert!(rebuilt.get_metadata_map().is_empty()); + assert!(rebuilt.is_unsubscribe()); + assert!(!rebuilt.is_subscribe()); + assert_eq!(rebuilt.get_subscription_id(), Some(sub_id)); + assert_eq!(rebuilt.get_source(), dst); + assert_eq!(rebuilt.get_dst(), dst); + } + + #[test] + fn rebuild_header_for_control_plane_subscribe_zero_subscription_id() { + let dst = ProtoName::from_strings(["a", "b", "c"]); + let msg = ProtoMessage::builder() + .source(dst.clone()) + .destination(dst.clone()) + .subscription_id(0) + .build_subscribe() + .unwrap(); + + let rebuilt = msg.rebuild_header_for_control_plane().unwrap(); + assert_eq!(rebuilt.get_subscription_id(), None); + if let Some(ProtoSubscribeType(s)) = &rebuilt.message_type { + assert_eq!(s.subscription_id, 0); + } + } + + #[test] + fn rebuild_header_for_control_plane_publish_returns_none() { + let msg = ProtoMessage::builder() + .source(ProtoName::from_strings(["o", "n", "s"])) + .destination(ProtoName::from_strings(["o", "n", "d"])) + .application_payload("t", vec![1, 2]) + .build_publish() + .unwrap(); + + assert!(msg.rebuild_header_for_control_plane().is_none()); + } + + #[test] + fn rebuild_header_for_control_plane_link_returns_none() { + let msg = ProtoMessage::builder().build_link_negotiation("lid", "1.0.0", false); + assert!(msg.rebuild_header_for_control_plane().is_none()); + } + + #[test] + fn rebuild_header_for_control_plane_subscription_ack_returns_none() { + let msg = ProtoMessage::builder().build_subscription_ack(9, true, ""); + assert!(msg.rebuild_header_for_control_plane().is_none()); + } + + #[test] + fn rebuild_header_for_control_plane_subscribe_missing_slim_header() { + let msg = ProtoMessage::new( + HashMap::new(), + ProtoSubscribeType(ProtoSubscribe { + header: None, + subscription_id: 1, + }), + ); + assert!(msg.rebuild_header_for_control_plane().is_none()); + } + + #[test] + fn rebuild_header_for_control_plane_subscribe_missing_destination() { + let name = ProtoName::from_strings(["x", "y", "z"]); + let hdr = SlimHeader { + source: Some(name.clone()), + destination: None, + identity: String::new(), + fanout: 1, + recv_from: None, + forward_to: None, + incoming_conn: None, + error: None, + }; + let msg = ProtoMessage::new( + HashMap::new(), + ProtoSubscribeType(ProtoSubscribe { + header: Some(hdr), + subscription_id: 99, + }), + ); + assert!(msg.rebuild_header_for_control_plane().is_none()); + } + #[test] fn test_publish() { let source = ProtoName::from_strings(["org", "ns", "type"]).with_id(1);