Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions data-plane/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions data-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ serde-pyobject = "0.7"
serde_json = "1.0"
serde_yaml = "0.9.34"
spiffe = { version = "0.12.0", features = ["x509-source", "jwt-source", "jwt-verify-aws-lc-rs"] }
stats_alloc = "0.1.10"
tempfile = "3"
test-fork = "0.1.3"
thiserror = "2.0.9"
Expand Down
8 changes: 8 additions & 0 deletions data-plane/Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ tasks:
cmds:
- git describe --tags --match "slim-v*" | cut -d 'v' -f 2

data-plane:bench:process-stream:
desc: "Run process_stream Criterion bench and allocation-counter bench"
cmds:
- cargo bench -p agntcy-slim-datapath --bench process_stream_benchmark {{.ARGS}}
- cargo bench -p agntcy-slim-datapath --bench process_stream_alloc {{.ARGS}}
vars:
ARGS: '{{.ARGS | default ""}}'

data-plane:generate:grpc-json-schema:
desc: "Generate the gRPC schema"
cmds:
Expand Down
5 changes: 5 additions & 0 deletions data-plane/core/datapath/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@ tonic-prost-build = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
stats_alloc = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tracing-test = { workspace = true, features = ["no-env-filter"] }

[[bench]]
name = "pool_benchmark"
harness = false

[[bench]]
name = "process_stream_benchmark"
harness = false

[[bench]]
name = "name_benchmark"
harness = false
53 changes: 53 additions & 0 deletions data-plane/core/datapath/benches/process_stream_alloc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright AGNTCY Contributors (https://github.com/agntcy)
// SPDX-License-Identifier: Apache-2.0

//! Allocation counters for one `bench_process_stream` iteration (CP mirror path), using
//! [`stats_alloc`](https://docs.rs/stats_alloc).

mod process_stream_common;

use std::alloc::System;

use stats_alloc::{INSTRUMENTED_SYSTEM, Region, StatsAlloc};

#[global_allocator]
static GLOBAL: &StatsAlloc<System> = &INSTRUMENTED_SYSTEM;

fn main() {
let rt = process_stream_common::runtime();
rt.block_on(process_stream_common::assert_cp_mirror_rebuild_path());

const WARMUP: usize = 3;
for _ in 0..WARMUP {
rt.block_on(process_stream_common::one_iteration_cp_mirror());
}

const SAMPLES: usize = 50;
let mut allocation_counts = Vec::with_capacity(SAMPLES);
let mut bytes_allocated = Vec::with_capacity(SAMPLES);
let mut deallocations = Vec::with_capacity(SAMPLES);
for _ in 0..SAMPLES {
let region = Region::new(GLOBAL);
rt.block_on(process_stream_common::one_iteration_cp_mirror());
let s = region.change();
allocation_counts.push(s.allocations);
bytes_allocated.push(s.bytes_allocated);
deallocations.push(s.deallocations);
}

fn summarize(values: &[usize]) -> (usize, usize, f64) {
let min = *values.iter().min().unwrap_or(&0);
let max = *values.iter().max().unwrap_or(&0);
let mean = values.iter().sum::<usize>() as f64 / values.len().max(1) as f64;
(min, max, mean)
}

let (a_min, a_max, a_mean) = summarize(&allocation_counts);
let (b_min, b_max, b_mean) = summarize(&bytes_allocated);
let (d_min, d_max, d_mean) = summarize(&deallocations);

println!(
"\nprocess_stream_alloc: {} samples (one CP-mirror iteration each)\n allocations: min={} max={} mean={:.1}\n bytes_allocated (gross): min={} max={} mean={:.1}\n deallocations: min={} max={} mean={:.1}",
SAMPLES, a_min, a_max, a_mean, b_min, b_max, b_mean, d_min, d_max, d_mean
);
}
31 changes: 31 additions & 0 deletions data-plane/core/datapath/benches/process_stream_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright AGNTCY Contributors (https://github.com/agntcy)
// SPDX-License-Identifier: Apache-2.0

//! Measures [`MessageProcessor::bench_process_stream`] when remote subscribe/unsubscribe is mirrored
//! to the control plane via [`ProtoMessage::rebuild_header_for_control_plane`] (compact header +
//! `recv_from` = dataplane connection index), with `unwrap_or_else(|| msg.clone())` only as fallback.
//!
//! Setup: `is_local = false`, `from_control_plane = false`, mock CP channel from
//! [`MessageProcessor::bench_set_control_plane_tx`].
//!
//! For heap allocation counters over the same workload, run
//! `cargo bench -p agntcy-slim-datapath --bench process_stream_alloc` or
//! `task data-plane:bench:process-stream`.

mod process_stream_common;

use criterion::{Criterion, black_box, criterion_group, criterion_main};

fn bench_process_stream_cp_mirror_rebuild(c: &mut Criterion) {
let rt = process_stream_common::runtime();
rt.block_on(process_stream_common::assert_cp_mirror_rebuild_path());

c.bench_function("process_stream: CP mirror (rebuild_header)", |b| {
b.iter(|| {
rt.block_on(black_box(process_stream_common::one_iteration_cp_mirror()));
});
});
}

criterion_group!(benches, bench_process_stream_cp_mirror_rebuild);
criterion_main!(benches);
105 changes: 105 additions & 0 deletions data-plane/core/datapath/benches/process_stream_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright AGNTCY Contributors (https://github.com/agntcy)
// SPDX-License-Identifier: Apache-2.0

//! Shared setup for `process_stream` benchmarks (Criterion timing and allocation harness).

use std::sync::OnceLock;
use std::time::Duration;

use slim_datapath::api::{ProtoMessage, ProtoName};
use slim_datapath::message_processing::MessageProcessor;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;

pub fn runtime() -> &'static Runtime {
static RT: OnceLock<Runtime> = OnceLock::new();
RT.get_or_init(|| Runtime::new().expect("tokio runtime"))
}

pub fn bench_destination() -> ProtoName {
ProtoName::from_strings(["org", "ns", "bench-dst"]).with_id(2)
}

/// Inbound subscribe from a remote peer (valid SLIM header so rebuild returns `Some`).
pub fn make_remote_subscribe_for_cp_mirror() -> ProtoMessage {
let source = ProtoName::from_strings(["org", "ns", "bench-src"]).with_id(1);
ProtoMessage::builder()
.source(source)
.destination(bench_destination())
.subscription_id(42)
.build_subscribe()
.expect("valid subscribe for bench")
}

pub async fn assert_cp_mirror_rebuild_path() {
let processor = MessageProcessor::new();
let (cp_tx, mut cp_rx) = mpsc::channel(8);
processor.bench_set_control_plane_tx(cp_tx);
let conn_id = processor.bench_register_remote_connection();
let (in_tx, in_rx) = mpsc::channel(4);
let cancel = CancellationToken::new();
let handle = processor
.bench_process_stream(
ReceiverStream::new(in_rx),
conn_id,
None,
cancel.clone(),
false,
false,
)
.expect("bench_process_stream");

let msg = make_remote_subscribe_for_cp_mirror();
let expected_dst = msg.get_dst();
in_tx.send(Ok(msg)).await.expect("send inbound");
let mirrored = cp_rx
.recv()
.await
.expect("control plane should receive message")
.expect("mirror Ok");

assert_eq!(
mirrored.get_dst(),
expected_dst,
"mirrored message must carry subscription destination for the control plane"
);
assert!(
mirrored.get_subscription_id().is_some(),
"mirrored subscribe should carry subscription_id"
);

cancel.cancel();
match tokio::time::timeout(Duration::from_secs(2), handle).await {
Ok(Ok(())) => {}
Ok(Err(e)) => panic!("bench_process_stream task failed: {e}"),
Err(_) => panic!("join timeout"),
}
}

pub async fn one_iteration_cp_mirror() {
let processor = MessageProcessor::new();
let (cp_tx, mut cp_rx) = mpsc::channel(8);
processor.bench_set_control_plane_tx(cp_tx);
let conn_id = processor.bench_register_remote_connection();
let (in_tx, in_rx) = mpsc::channel(4);
let cancel = CancellationToken::new();
let handle = processor
.bench_process_stream(
ReceiverStream::new(in_rx),
conn_id,
None,
cancel.clone(),
false,
false,
)
.expect("bench_process_stream");

let msg = make_remote_subscribe_for_cp_mirror();
in_tx.send(Ok(msg)).await.expect("send inbound");
let _ = cp_rx.recv().await;
cancel.cancel();
let _join: Result<Result<(), tokio::task::JoinError>, tokio::time::error::Elapsed> =
tokio::time::timeout(Duration::from_secs(2), handle).await;
}
47 changes: 44 additions & 3 deletions data-plane/core/datapath/src/message_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,24 @@ impl MessageProcessor {
*tx_guard = Some(tx);
}

/// Test/benchmark hook: installs the sender returned by [`Self::get_tx_control_plane`] so remote
/// `process_stream` loops mirror traffic without a real control-plane registration.
#[doc(hidden)]
pub fn bench_set_control_plane_tx(&self, tx: Sender<Result<Message, Status>>) {
self.set_tx_control_plane(tx);
}

/// Test/benchmark hook: inserts a remote connection (server channel) suitable for driving
/// [`Self::process_stream`] with `is_local = false`.
#[doc(hidden)]
pub fn bench_register_remote_connection(&self) -> u64 {
let (tx, _rx) = mpsc::channel(16);
let connection = Connection::new(ConnectionType::Remote, Channel::Server(tx));
self.forwarder()
.on_connection_established(connection, None)
.expect("bench remote connection")
}

fn get_tx_control_plane(&self) -> Option<Sender<Result<Message, Status>>> {
let tx_guard = self.internal.tx_control_plane.read();
tx_guard.clone()
Expand Down Expand Up @@ -1099,9 +1117,11 @@ impl MessageProcessor {
match msg.get_type() {
PublishType(_) | LinkType(_) | SubscriptionAckType(_) => {/* do nothing */}
_ => {
// send subscriptions and unsubscriptions
// to the control plane
let _ = txcp.send(Ok(msg.clone())).await;
// Control plane only needs SLIM header + subscription id for
// subscribe/unsubscribe; avoid cloning metadata / full wire message.
let cp_msg = msg
.rebuild_header_for_control_plane();
let _ = txcp.send(Ok(cp_msg.unwrap())).await;
}
}
}
Expand Down Expand Up @@ -1251,6 +1271,27 @@ impl MessageProcessor {
Ok(handle)
}

/// Benchmark/test wrapper around [`Self::process_stream`]; production code keeps using the private method.
#[doc(hidden)]
pub fn bench_process_stream(
&self,
stream: impl Stream<Item = Result<Message, Status>> + Unpin + Send + 'static,
conn_index: u64,
client_config: Option<ClientConfig>,
cancellation_token: CancellationToken,
is_local: bool,
from_control_plane: bool,
) -> Result<JoinHandle<()>, DataPathError> {
self.process_stream(
stream,
conn_index,
client_config,
cancellation_token,
is_local,
from_control_plane,
)
}

fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
let mut err: &(dyn std::error::Error + 'static) = err_status;

Expand Down
Loading
Loading