From 0d33dae9cd1648dcea801fb2382eff5aceba3f45 Mon Sep 17 00:00:00 2001 From: topocount <17910833+topocount@users.noreply.github.com> Date: Mon, 23 Mar 2026 09:57:11 -0500 Subject: [PATCH] fix: implement per-fid rate limits to prevent abuse --- docs/pr-per-fid-mempool-limit.md | 29 ++ docs/runbooks/high-message-commit-rate.md | 84 ++++ src/mempool/mempool.rs | 78 +++- src/mempool/mempool_test.rs | 537 +++++++++++++++++++++- 4 files changed, 720 insertions(+), 8 deletions(-) create mode 100644 docs/pr-per-fid-mempool-limit.md create mode 100644 docs/runbooks/high-message-commit-rate.md diff --git a/docs/pr-per-fid-mempool-limit.md b/docs/pr-per-fid-mempool-limit.md new file mode 100644 index 000000000..3319f5d4c --- /dev/null +++ b/docs/pr-per-fid-mempool-limit.md @@ -0,0 +1,29 @@ +# Per-FID mempool size limit + +## Motivation + +During the Feb 26 spam incident, FID 2842822 submitted ~90K follow messages/hour. The rate limiter slowed it down but couldn't fully stop it. This adds a second layer of defense: a cap on how many of a single FID's messages can sit in the mempool at once. Once a FID fills its quota, new submissions are rejected immediately — before consuming rate limit tokens, before being gossiped, and before crowding out other FIDs. + +This is distinct from rate limiting (which limits submission *rate* over time). The mempool cap limits *concurrent pending messages*. + +## Changes + +- Added `max_mempool_messages_per_fid: u32` to `Config` (default: 100, 0 = disabled) +- Added `fid_message_counts: HashMap` to `Mempool` — one entry per FID with pending messages, incremented on insert and decremented on `pull_messages` (proposer path) or `remove_committed_txns` (non-proposer path) +- Added `fid_exceeds_mempool_limit` check in `insert_into_shard`, before `message_is_valid`, so rejected messages never touch the DB or consume rate limit tokens +- Only applies to `UserMessage` variants; validator/onchain/fname/block messages are unaffected +- New metric: `mempool.per_fid_limit_hit` (count, per shard) + +## Memory overhead + +The map holds one entry per FID with *currently pending* messages — not one per registered FID and not one per message. Between blocks it's nearly empty. Worst case (every mempool slot occupied by a different FID) is ~34 MB, which is unreachable in practice since spam incidents involve very few FIDs. + +## Performance + +Benchmarked sequential insert throughput with and without the cap, across both a normal workload (100 FIDs × 10 messages) and a single-FID spam scenario (1 FID × 500 messages, cap=50). Throughput was ~950 msg/s in all cases — identical with cap on or off. The cap check (a single HashMap lookup) adds no measurable overhead. The bottleneck is the 1ms poll interval, which yields a theoretical ceiling of ~256,000 msg/s per shard. The Feb 26 incident peaked at ~90K msg/hour (~25 msg/s), well within that headroom. + +## Running the perf tests + +``` +cargo test -p snapchain perf_ -- --ignored --nocapture +``` diff --git a/docs/runbooks/high-message-commit-rate.md b/docs/runbooks/high-message-commit-rate.md new file mode 100644 index 000000000..8d4272c4e --- /dev/null +++ b/docs/runbooks/high-message-commit-rate.md @@ -0,0 +1,84 @@ +# Snapchain High Message Commit Rate + +## Monitor configuration + +**Name:** `Snapchain - High message commit rate on {{host.name}} shard {{shard.name}}` + +**Metric:** `snaptest.engine.commit.merged_message` +- Evaluate: sum over last 5 minutes +- Group by: host, shard +- Warning threshold: > 3,000 +- Alert threshold: > 6,000 + +**Description:** +Fires when the number of user messages committed to blocks exceeds normal throughput on a single host/shard combination, which may indicate a spam campaign, a misconfigured client flooding the network, or a denial-of-service attempt against the protocol. + +- Warning (>3,000 / 5 min): Elevated commit rate — monitor closely, may resolve on its own +- Alert (>6,000 / 5 min): Sustained high throughput — likely active abuse or misconfigured submitter + +Note: This monitor fires on ALL message types. To identify which type is responsible, filter `snapchain.engine.commit.merged_message` by `message_type` tag in the Metrics Explorer (type 5 = LinkAdd/follow, type 3 = ReactionAdd, type 1 = CastAdd). + +--- + +## Runbook + +### 1. Identify the message type + +In Datadog Metrics Explorer, query: + +``` +snapchain.engine.commit.merged_message grouped by message_type, host, shard +``` + +Look for a single `message_type` dominating the spike. + +### 2. Identify the offending FID(s) + +Search logs for pruned messages during the incident window: + +``` +service:snapchain "Pruned messages" @fields.msg_type: +``` + +High prune counts for a single FID indicate that FID is saturating its store. + +Also search: + +``` +service:snapchain "rate limit exceeded for FID" +``` + +FIDs appearing frequently here are being blocked but are still submitting at high volume. + +### 3. Check if rate limits are enabled on all validators + +Confirm all validator configs have: + +```toml +[mempool] +enable_rate_limits = true +``` + +A node missing this setting will accept unlimited submissions from the offending FID and gossip them to the rest of the network. + +### 4. Identify signer keys for the FID + +``` +GET https://snap.farcaster.xyz:3381/v1/onChainSignersByFid?fid= +``` + +Note the signer keys and their `addedAt` timestamps — newly added keys near the incident start indicate intentional abuse with a fresh signer. + +### 5. Short-term mitigation + +If spam is ongoing and rate limits are not sufficient: + +- Add `enable_rate_limits = true` to any validator config missing it and restart that node +- To stop a specific FID, a code change is required: add `blocked_fids: HashSet` to `mempool::Config` and check it in `message_is_valid()` (src/mempool/mempool.rs) before the rate limit check +- A node restart clears the in-memory mempool, but messages will re-gossip from other nodes unless the source is blocked at submission + +### 6. Escalate if + +- Block commit rate is consistently near `max_messages_per_block` (1000) — legitimate messages may be getting crowded out of blocks +- The spike persists after `enable_rate_limits = true` is confirmed on all validators +- Multiple FIDs are involved simultaneously (coordinated attack) diff --git a/src/mempool/mempool.rs b/src/mempool/mempool.rs index aaf94fa74..56dee13d4 100644 --- a/src/mempool/mempool.rs +++ b/src/mempool/mempool.rs @@ -130,6 +130,7 @@ pub struct Config { pub capacity_per_shard: u64, pub rx_poll_interval: Duration, pub enable_rate_limits: bool, + pub max_mempool_messages_per_fid: u32, // 0 = disabled } impl Default for Config { @@ -140,6 +141,7 @@ impl Default for Config { capacity_per_shard: 1_000_000, rx_poll_interval: Duration::from_millis(1), enable_rate_limits: false, + max_mempool_messages_per_fid: 100, } } } @@ -489,6 +491,7 @@ pub struct Mempool { statsd_client: StatsdClientWrapper, read_node_mempool: ReadNodeMempool, rate_limits: Option, + fid_message_counts: HashMap, // fid -> pending message count across all shards } impl Mempool { @@ -507,6 +510,7 @@ impl Mempool { ) -> Self { Mempool { messages: HashMap::new(), + fid_message_counts: HashMap::new(), messages_request_rx, shard_decision_rx, block_decision_rx, @@ -550,6 +554,21 @@ impl Mempool { } } + fn fid_exceeds_mempool_limit(&self, message: &MempoolMessage) -> bool { + if self.config.max_mempool_messages_per_fid == 0 { + return false; + } + match message { + MempoolMessage::UserMessage(_) => self + .fid_message_counts + .get(&message.fid()) + .map_or(false, |count| { + *count >= self.config.max_mempool_messages_per_fid as usize + }), + _ => false, // Never limit validator/onchain/fname/block messages + } + } + fn message_already_exists(&mut self, shard: u32, message: &MempoolMessage) -> bool { self.read_node_mempool .message_already_exists(shard, message) @@ -565,6 +584,15 @@ impl Mempool { match shard_messages.pop_first() { None => break, Some((_, next_message)) => { + if matches!(next_message, MempoolMessage::UserMessage(_)) { + let fid = next_message.fid(); + if let Some(count) = self.fid_message_counts.get_mut(&fid) { + *count = count.saturating_sub(1); + if *count == 0 { + self.fid_message_counts.remove(&fid); + } + } + } let result = self.message_is_valid(request.shard_id, &next_message); if result.is_ok() { messages.push(next_message); @@ -660,9 +688,20 @@ impl Mempool { None => {} } + // Per-FID mempool cap: reject before consuming rate limit tokens + if self.fid_exceeds_mempool_limit(&message) { + self.statsd_client + .count_with_shard(shard_id, "mempool.per_fid_limit_hit", 1, vec![]); + return Err(HubError::rate_limited(&format!( + "per-fid mempool limit exceeded for FID {}", + message.fid() + ))); + } + // TODO(aditi): Maybe we don't need to run validations here? let result = self.message_is_valid(shard_id, &message); if result.is_ok() { + let fid = message.fid(); match self.messages.get_mut(&shard_id) { None => { let mut messages = BTreeMap::new(); @@ -681,6 +720,14 @@ impl Mempool { } } + // Increment per-FID count for UserMessage variants + if matches!(message, MempoolMessage::UserMessage(_)) { + self.fid_message_counts + .entry(fid) + .and_modify(|c| *c += 1) + .or_insert(1); + } + self.statsd_client .count_with_shard(shard_id, "mempool.insert.success", 1, vec![]); @@ -697,13 +744,30 @@ impl Mempool { if let Some(mempool) = self.messages.get_mut(&height.shard_index) { for transaction in transactions { for user_message in &transaction.user_messages { - mempool.remove(&user_message.mempool_key()); - self.statsd_client.count_with_shard( - height.shard_index, - "mempool.remove.success", - 1, - vec![], - ); + if let Some(_) = mempool.remove(&user_message.mempool_key()) { + // Only decrement if the message was still present (non-proposer path). + // Proposers already decremented in pull_messages. + let fid = user_message.fid(); + if let Some(count) = self.fid_message_counts.get_mut(&fid) { + *count = count.saturating_sub(1); + if *count == 0 { + self.fid_message_counts.remove(&fid); + } + } + self.statsd_client.count_with_shard( + height.shard_index, + "mempool.remove.success", + 1, + vec![], + ); + } else { + self.statsd_client.count_with_shard( + height.shard_index, + "mempool.remove.success", + 1, + vec![], + ); + } } for system_message in &transaction.system_messages { mempool.remove(&system_message.mempool_key()); diff --git a/src/mempool/mempool_test.rs b/src/mempool/mempool_test.rs index ec236e9ec..79bb698d7 100644 --- a/src/mempool/mempool_test.rs +++ b/src/mempool/mempool_test.rs @@ -31,7 +31,7 @@ mod tests { use self::test_helper::{default_custody_address, default_signer}; - use std::time::Duration; + use std::time::{Duration, Instant}; use crate::mempool::mempool::{MempoolRequest, MempoolSource}; use crate::mempool::routing::{MessageRouter, ShardRouter}; @@ -129,6 +129,81 @@ mod tests { ) } + /// Like `setup` but accepts a full `mempool::Config` instead of just `enable_rate_limits`. + /// Returns the same tuple minus the gossip fields (None gossip). + async fn setup_with_mempool_config( + mempool_config: mempool::Config, + num_shards: u32, + ) -> ( + HashMap, + BlockEngine, + Mempool, + mpsc::Sender, + mpsc::Sender, + broadcast::Sender, + broadcast::Sender, + ) { + let statsd_client = StatsdClientWrapper::new( + cadence::StatsdClient::builder("", cadence::NopMetricSink {}).build(), + true, + ); + + let (mempool_tx, mempool_rx) = mpsc::channel(10_000); + let (messages_request_tx, messages_request_rx) = mpsc::channel(100); + let (shard_decision_tx, shard_decision_rx) = broadcast::channel(100); + let (block_decision_tx, block_decision_rx) = broadcast::channel(100); + let mut shard_stores = HashMap::new(); + let mut engines = HashMap::new(); + for i in 1..num_shards + 1 { + let (engine, _) = test_helper::new_engine().await; + shard_stores.insert(i, engine.get_stores()); + engines.insert(i, engine); + } + + let (block_engine, _) = block_engine_test_helpers::setup(); + let gossip_tx = mpsc::channel(100).0; + + let mempool = Mempool::new( + mempool_config, + FarcasterNetwork::Devnet, + mempool_rx, + messages_request_rx, + num_shards, + shard_stores, + block_engine.stores(), + gossip_tx, + shard_decision_rx, + block_decision_rx, + statsd_client, + ); + + ( + engines, + block_engine, + mempool, + mempool_tx, + messages_request_tx, + shard_decision_tx, + block_decision_tx, + ) + } + + async fn send_and_await( + mempool_tx: &mpsc::Sender, + message: MempoolMessage, + ) -> Result<(), crate::core::error::HubError> { + let (tx, rx) = oneshot::channel(); + mempool_tx + .send(MempoolRequest::AddMessage( + message, + MempoolSource::Local, + Some(tx), + )) + .await + .unwrap(); + rx.await.unwrap() + } + async fn pull_message( messages_request_tx: &mpsc::Sender, shard_id: u32, @@ -895,4 +970,464 @@ mod tests { .await; pull_message(&messages_request_tx, 1, None).await; } + + // ------------------------------------------------------------------------- + // Per-FID mempool limit: logic tests + // ------------------------------------------------------------------------- + + #[tokio::test] + async fn test_per_fid_mempool_limit_blocks_excess_messages() { + let config = mempool::Config { + max_mempool_messages_per_fid: 5, + ..Default::default() + }; + let ( + _, + _, + mut mempool, + mempool_tx, + _messages_request_tx, + _shard_decision_tx, + _block_decision_tx, + ) = setup_with_mempool_config(config, 1).await; + tokio::spawn(async move { mempool.run().await }); + + let fid = 9999; + for i in 0..5 { + let result = send_and_await( + &mempool_tx, + MempoolMessage::UserMessage(create_cast_add( + fid, + &format!("msg {}", i), + None, + None, + )), + ) + .await; + assert!(result.is_ok(), "message {} should be accepted", i); + } + + // 6th message for the same FID must be rejected + let result = send_and_await( + &mempool_tx, + MempoolMessage::UserMessage(create_cast_add(fid, "msg 5", None, None)), + ) + .await; + assert!(result.is_err(), "6th message should be rejected"); + assert_eq!( + result.unwrap_err().code, + "bad_request.rate_limited", + "error should be rate_limited" + ); + } + + #[tokio::test] + async fn test_per_fid_mempool_limit_zero_disables_cap() { + let config = mempool::Config { + max_mempool_messages_per_fid: 0, + ..Default::default() + }; + let ( + _, + _, + mut mempool, + mempool_tx, + _messages_request_tx, + _shard_decision_tx, + _block_decision_tx, + ) = setup_with_mempool_config(config, 1).await; + tokio::spawn(async move { mempool.run().await }); + + let fid = 7777; + // With cap disabled, all 200 messages should be accepted + for i in 0..200 { + let result = send_and_await( + &mempool_tx, + MempoolMessage::UserMessage(create_cast_add( + fid, + &format!("msg {}", i), + None, + None, + )), + ) + .await; + assert!( + result.is_ok(), + "message {} should be accepted when cap is 0", + i + ); + } + } + + #[tokio::test] + async fn test_per_fid_mempool_limit_different_fids_are_independent() { + let config = mempool::Config { + max_mempool_messages_per_fid: 2, + ..Default::default() + }; + let ( + _, + _, + mut mempool, + mempool_tx, + _messages_request_tx, + _shard_decision_tx, + _block_decision_tx, + ) = setup_with_mempool_config(config, 1).await; + tokio::spawn(async move { mempool.run().await }); + + // Fill cap for fid_a + let fid_a = 1111; + let fid_b = 2222; + for i in 0..2 { + send_and_await( + &mempool_tx, + MempoolMessage::UserMessage(create_cast_add( + fid_a, + &format!("a {}", i), + None, + None, + )), + ) + .await + .unwrap(); + } + + // fid_a is now at cap; fid_b should still accept messages + for i in 0..2 { + let result = send_and_await( + &mempool_tx, + MempoolMessage::UserMessage(create_cast_add( + fid_b, + &format!("b {}", i), + None, + None, + )), + ) + .await; + assert!( + result.is_ok(), + "fid_b message {} should be independent of fid_a cap", + i + ); + } + + // fid_a should be rejected + let result = send_and_await( + &mempool_tx, + MempoolMessage::UserMessage(create_cast_add(fid_a, "a overflow", None, None)), + ) + .await; + assert!(result.is_err(), "fid_a should be at cap"); + } + + #[tokio::test] + async fn test_per_fid_mempool_limit_does_not_affect_onchain_events() { + let config = mempool::Config { + max_mempool_messages_per_fid: 1, + ..Default::default() + }; + let ( + _, + _, + mut mempool, + mempool_tx, + _messages_request_tx, + _shard_decision_tx, + _block_decision_tx, + ) = setup_with_mempool_config(config, 1).await; + tokio::spawn(async move { mempool.run().await }); + + let fid = 5555; + + // Fill the user-message cap + send_and_await( + &mempool_tx, + MempoolMessage::UserMessage(create_cast_add(fid, "user msg", None, None)), + ) + .await + .unwrap(); + + // Onchain events for the same FID must pass through regardless + for i in 0..5 { + let event = events_factory::create_rent_event( + fid, + i + 1, + proto::StorageUnitType::UnitType2025, + false, + proto::FarcasterNetwork::Devnet, + ); + let result = send_and_await(&mempool_tx, MempoolMessage::OnchainEvent(event)).await; + assert!( + result.is_ok(), + "onchain event {} should not be blocked by user-message cap", + i + ); + } + } + + #[tokio::test] + async fn test_per_fid_mempool_limit_count_drains_after_shard_commit() { + let config = mempool::Config { + max_mempool_messages_per_fid: 2, + ..Default::default() + }; + let ( + _, + _, + mut mempool, + mempool_tx, + _messages_request_tx, + shard_decision_tx, + _block_decision_tx, + ) = setup_with_mempool_config(config, 1).await; + tokio::spawn(async move { mempool.run().await }); + + let fid = 3333; + let cast1 = create_cast_add(fid, "msg 0", None, None); + let cast2 = create_cast_add(fid, "msg 1", None, None); + + send_and_await(&mempool_tx, MempoolMessage::UserMessage(cast1.clone())) + .await + .unwrap(); + send_and_await(&mempool_tx, MempoolMessage::UserMessage(cast2.clone())) + .await + .unwrap(); + + // At cap — next message should fail + let result = send_and_await( + &mempool_tx, + MempoolMessage::UserMessage(create_cast_add(fid, "msg 2", None, None)), + ) + .await; + assert!(result.is_err(), "should be at cap before commit"); + + // Commit a shard chunk containing both messages + let chunk = ShardChunk { + header: Some(ShardHeader { + height: Some(Height { + shard_index: 1, + block_number: 1, + }), + timestamp: 0, + parent_hash: vec![], + shard_root: vec![], + }), + hash: vec![], + transactions: vec![Transaction { + fid, + user_messages: vec![cast1, cast2], + system_messages: vec![], + account_root: vec![], + }], + commits: None, + }; + shard_decision_tx.send(chunk).unwrap(); + + // Give the mempool time to process the commit + tokio::time::sleep(Duration::from_millis(50)).await; + + // Count should be drained — new messages should be accepted again + let result = send_and_await( + &mempool_tx, + MempoolMessage::UserMessage(create_cast_add(fid, "msg after commit", None, None)), + ) + .await; + assert!( + result.is_ok(), + "should accept new messages after commit drains the count" + ); + } + + // ------------------------------------------------------------------------- + // Per-FID mempool limit: performance comparison tests + // Run with: cargo test perf_ -- --ignored --nocapture + // ------------------------------------------------------------------------- + + async fn measure_insert_throughput( + mempool_tx: &mpsc::Sender, + messages: Vec, + ) -> (Duration, usize) { + let total = messages.len(); + let start = Instant::now(); + let mut accepted = 0; + for msg in messages { + if send_and_await(mempool_tx, msg).await.is_ok() { + accepted += 1; + } + } + (start.elapsed(), accepted) + } + + #[tokio::test] + #[ignore = "performance test – run with: cargo test perf_ -- --ignored --nocapture"] + async fn test_perf_many_fids_no_cap() { + let config = mempool::Config { + max_mempool_messages_per_fid: 0, + ..Default::default() + }; + let ( + _, + _, + mut mempool, + mempool_tx, + _messages_request_tx, + _shard_decision_tx, + _block_decision_tx, + ) = setup_with_mempool_config(config, 1).await; + tokio::spawn(async move { mempool.run().await }); + + const FIDS: u64 = 100; + const MSGS_PER_FID: usize = 10; + let messages: Vec<_> = (1..=FIDS) + .flat_map(|fid| { + (0..MSGS_PER_FID).map(move |i| { + MempoolMessage::UserMessage(create_cast_add( + fid, + &format!("m{}", i), + None, + None, + )) + }) + }) + .collect(); + + let (elapsed, accepted) = measure_insert_throughput(&mempool_tx, messages).await; + let total = FIDS as usize * MSGS_PER_FID; + eprintln!( + "[no cap ] many-FIDs: {}/{} accepted in {:?} ({:.0} msg/s)", + accepted, + total, + elapsed, + total as f64 / elapsed.as_secs_f64() + ); + } + + #[tokio::test] + #[ignore = "performance test – run with: cargo test perf_ -- --ignored --nocapture"] + async fn test_perf_many_fids_with_cap() { + let config = mempool::Config { + max_mempool_messages_per_fid: 100, // cap well above MSGS_PER_FID so nobody is rejected + ..Default::default() + }; + let ( + _, + _, + mut mempool, + mempool_tx, + _messages_request_tx, + _shard_decision_tx, + _block_decision_tx, + ) = setup_with_mempool_config(config, 1).await; + tokio::spawn(async move { mempool.run().await }); + + const FIDS: u64 = 100; + const MSGS_PER_FID: usize = 10; + let messages: Vec<_> = (1..=FIDS) + .flat_map(|fid| { + (0..MSGS_PER_FID).map(move |i| { + MempoolMessage::UserMessage(create_cast_add( + fid, + &format!("m{}", i), + None, + None, + )) + }) + }) + .collect(); + + let (elapsed, accepted) = measure_insert_throughput(&mempool_tx, messages).await; + let total = FIDS as usize * MSGS_PER_FID; + eprintln!( + "[cap=100 ] many-FIDs: {}/{} accepted in {:?} ({:.0} msg/s)", + accepted, + total, + elapsed, + total as f64 / elapsed.as_secs_f64() + ); + } + + #[tokio::test] + #[ignore = "performance test – run with: cargo test perf_ -- --ignored --nocapture"] + async fn test_perf_single_fid_spam_no_cap() { + let config = mempool::Config { + max_mempool_messages_per_fid: 0, + ..Default::default() + }; + let ( + _, + _, + mut mempool, + mempool_tx, + _messages_request_tx, + _shard_decision_tx, + _block_decision_tx, + ) = setup_with_mempool_config(config, 1).await; + tokio::spawn(async move { mempool.run().await }); + + const N: usize = 500; + let fid = 42; + let messages: Vec<_> = (0..N) + .map(|i| { + MempoolMessage::UserMessage(create_cast_add( + fid, + &format!("spam {}", i), + None, + None, + )) + }) + .collect(); + + let (elapsed, accepted) = measure_insert_throughput(&mempool_tx, messages).await; + eprintln!( + "[no cap ] single-FID spam: {}/{} accepted in {:?} ({:.0} msg/s)", + accepted, + N, + elapsed, + N as f64 / elapsed.as_secs_f64() + ); + } + + #[tokio::test] + #[ignore = "performance test – run with: cargo test perf_ -- --ignored --nocapture"] + async fn test_perf_single_fid_spam_with_cap() { + const CAP: u32 = 50; + let config = mempool::Config { + max_mempool_messages_per_fid: CAP, + ..Default::default() + }; + let ( + _, + _, + mut mempool, + mempool_tx, + _messages_request_tx, + _shard_decision_tx, + _block_decision_tx, + ) = setup_with_mempool_config(config, 1).await; + tokio::spawn(async move { mempool.run().await }); + + const N: usize = 500; + let fid = 42; + let messages: Vec<_> = (0..N) + .map(|i| { + MempoolMessage::UserMessage(create_cast_add( + fid, + &format!("spam {}", i), + None, + None, + )) + }) + .collect(); + + let (elapsed, accepted) = measure_insert_throughput(&mempool_tx, messages).await; + eprintln!( + "[cap={CAP:<4}] single-FID spam: {}/{} accepted in {:?} ({:.0} msg/s, {} rejected fast)", + accepted, + N, + elapsed, + N as f64 / elapsed.as_secs_f64(), + N - accepted, + ); + } }