Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/pr-per-fid-mempool-limit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Per-FID mempool size limit

## Motivation

During the Feb 26 spam incident, FID 2842822 submitted ~90K follow messages/hour. The rate limiter slowed it down but couldn't fully stop it. This adds a second layer of defense: a cap on how many of a single FID's messages can sit in the mempool at once. Once a FID fills its quota, new submissions are rejected immediately — before consuming rate limit tokens, before being gossiped, and before crowding out other FIDs.

This is distinct from rate limiting (which limits submission *rate* over time). The mempool cap limits *concurrent pending messages*.

## Changes

- Added `max_mempool_messages_per_fid: u32` to `Config` (default: 100, 0 = disabled)
- Added `fid_message_counts: HashMap<u64, usize>` to `Mempool` — one entry per FID with pending messages, incremented on insert and decremented on `pull_messages` (proposer path) or `remove_committed_txns` (non-proposer path)
- Added `fid_exceeds_mempool_limit` check in `insert_into_shard`, before `message_is_valid`, so rejected messages never touch the DB or consume rate limit tokens
- Only applies to `UserMessage` variants; validator/onchain/fname/block messages are unaffected
- New metric: `mempool.per_fid_limit_hit` (count, per shard)

## Memory overhead

The map holds one entry per FID with *currently pending* messages — not one per registered FID and not one per message. Between blocks it's nearly empty. Worst case (every mempool slot occupied by a different FID) is ~34 MB, which is unreachable in practice since spam incidents involve very few FIDs.

## Performance

Benchmarked sequential insert throughput with and without the cap, across both a normal workload (100 FIDs × 10 messages) and a single-FID spam scenario (1 FID × 500 messages, cap=50). Throughput was ~950 msg/s in all cases — identical with cap on or off. The cap check (a single HashMap lookup) adds no measurable overhead. The bottleneck is the 1ms poll interval, which yields a theoretical ceiling of ~256,000 msg/s per shard. The Feb 26 incident peaked at ~90K msg/hour (~25 msg/s), well within that headroom.

## Running the perf tests

```
cargo test -p snapchain perf_ -- --ignored --nocapture
```
84 changes: 84 additions & 0 deletions docs/runbooks/high-message-commit-rate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Snapchain High Message Commit Rate

## Monitor configuration

**Name:** `Snapchain - High message commit rate on {{host.name}} shard {{shard.name}}`

**Metric:** `snaptest.engine.commit.merged_message`
- Evaluate: sum over last 5 minutes
- Group by: host, shard
- Warning threshold: > 3,000
- Alert threshold: > 6,000

**Description:**
Fires when the number of user messages committed to blocks exceeds normal throughput on a single host/shard combination, which may indicate a spam campaign, a misconfigured client flooding the network, or a denial-of-service attempt against the protocol.

- Warning (>3,000 / 5 min): Elevated commit rate — monitor closely, may resolve on its own
- Alert (>6,000 / 5 min): Sustained high throughput — likely active abuse or misconfigured submitter

Note: This monitor fires on ALL message types. To identify which type is responsible, filter `snapchain.engine.commit.merged_message` by `message_type` tag in the Metrics Explorer (type 5 = LinkAdd/follow, type 3 = ReactionAdd, type 1 = CastAdd).

---

## Runbook

### 1. Identify the message type

In Datadog Metrics Explorer, query:

```
snapchain.engine.commit.merged_message grouped by message_type, host, shard
```

Look for a single `message_type` dominating the spike.

### 2. Identify the offending FID(s)

Search logs for pruned messages during the incident window:

```
service:snapchain "Pruned messages" @fields.msg_type:<TYPE>
```

High prune counts for a single FID indicate that FID is saturating its store.

Also search:

```
service:snapchain "rate limit exceeded for FID"
```

FIDs appearing frequently here are being blocked but are still submitting at high volume.

### 3. Check if rate limits are enabled on all validators

Confirm all validator configs have:

```toml
[mempool]
enable_rate_limits = true
```

A node missing this setting will accept unlimited submissions from the offending FID and gossip them to the rest of the network.

### 4. Identify signer keys for the FID

```
GET https://snap.farcaster.xyz:3381/v1/onChainSignersByFid?fid=<FID>
```

Note the signer keys and their `addedAt` timestamps — newly added keys near the incident start indicate intentional abuse with a fresh signer.

### 5. Short-term mitigation

If spam is ongoing and rate limits are not sufficient:

- Add `enable_rate_limits = true` to any validator config missing it and restart that node
- To stop a specific FID, a code change is required: add `blocked_fids: HashSet<u64>` to `mempool::Config` and check it in `message_is_valid()` (src/mempool/mempool.rs) before the rate limit check
- A node restart clears the in-memory mempool, but messages will re-gossip from other nodes unless the source is blocked at submission

### 6. Escalate if

- Block commit rate is consistently near `max_messages_per_block` (1000) — legitimate messages may be getting crowded out of blocks
- The spike persists after `enable_rate_limits = true` is confirmed on all validators
- Multiple FIDs are involved simultaneously (coordinated attack)
78 changes: 71 additions & 7 deletions src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub struct Config {
pub capacity_per_shard: u64,
pub rx_poll_interval: Duration,
pub enable_rate_limits: bool,
pub max_mempool_messages_per_fid: u32, // 0 = disabled
}

impl Default for Config {
Expand All @@ -140,6 +141,7 @@ impl Default for Config {
capacity_per_shard: 1_000_000,
rx_poll_interval: Duration::from_millis(1),
enable_rate_limits: false,
max_mempool_messages_per_fid: 100,
}
}
}
Expand Down Expand Up @@ -489,6 +491,7 @@ pub struct Mempool {
statsd_client: StatsdClientWrapper,
read_node_mempool: ReadNodeMempool,
rate_limits: Option<RateLimits>,
fid_message_counts: HashMap<u64, usize>, // fid -> pending message count across all shards
}

impl Mempool {
Expand All @@ -507,6 +510,7 @@ impl Mempool {
) -> Self {
Mempool {
messages: HashMap::new(),
fid_message_counts: HashMap::new(),
messages_request_rx,
shard_decision_rx,
block_decision_rx,
Expand Down Expand Up @@ -550,6 +554,21 @@ impl Mempool {
}
}

fn fid_exceeds_mempool_limit(&self, message: &MempoolMessage) -> bool {
if self.config.max_mempool_messages_per_fid == 0 {
return false;
}
match message {
MempoolMessage::UserMessage(_) => self
.fid_message_counts
.get(&message.fid())
.map_or(false, |count| {
*count >= self.config.max_mempool_messages_per_fid as usize
}),
_ => false, // Never limit validator/onchain/fname/block messages
}
}

fn message_already_exists(&mut self, shard: u32, message: &MempoolMessage) -> bool {
self.read_node_mempool
.message_already_exists(shard, message)
Expand All @@ -565,6 +584,15 @@ impl Mempool {
match shard_messages.pop_first() {
None => break,
Some((_, next_message)) => {
if matches!(next_message, MempoolMessage::UserMessage(_)) {
let fid = next_message.fid();
if let Some(count) = self.fid_message_counts.get_mut(&fid) {
*count = count.saturating_sub(1);
if *count == 0 {
self.fid_message_counts.remove(&fid);
}
}
}
let result = self.message_is_valid(request.shard_id, &next_message);
if result.is_ok() {
messages.push(next_message);
Expand Down Expand Up @@ -660,9 +688,20 @@ impl Mempool {
None => {}
}

// Per-FID mempool cap: reject before consuming rate limit tokens
if self.fid_exceeds_mempool_limit(&message) {
self.statsd_client
.count_with_shard(shard_id, "mempool.per_fid_limit_hit", 1, vec![]);
return Err(HubError::rate_limited(&format!(
"per-fid mempool limit exceeded for FID {}",
message.fid()
)));
}

// TODO(aditi): Maybe we don't need to run validations here?
let result = self.message_is_valid(shard_id, &message);
if result.is_ok() {
let fid = message.fid();
match self.messages.get_mut(&shard_id) {
None => {
let mut messages = BTreeMap::new();
Expand All @@ -681,6 +720,14 @@ impl Mempool {
}
}

// Increment per-FID count for UserMessage variants
if matches!(message, MempoolMessage::UserMessage(_)) {
self.fid_message_counts
.entry(fid)
.and_modify(|c| *c += 1)
.or_insert(1);
}

self.statsd_client
.count_with_shard(shard_id, "mempool.insert.success", 1, vec![]);

Expand All @@ -697,13 +744,30 @@ impl Mempool {
if let Some(mempool) = self.messages.get_mut(&height.shard_index) {
for transaction in transactions {
for user_message in &transaction.user_messages {
mempool.remove(&user_message.mempool_key());
self.statsd_client.count_with_shard(
height.shard_index,
"mempool.remove.success",
1,
vec![],
);
if let Some(_) = mempool.remove(&user_message.mempool_key()) {
// Only decrement if the message was still present (non-proposer path).
// Proposers already decremented in pull_messages.
let fid = user_message.fid();
if let Some(count) = self.fid_message_counts.get_mut(&fid) {
*count = count.saturating_sub(1);
if *count == 0 {
self.fid_message_counts.remove(&fid);
}
}
self.statsd_client.count_with_shard(
height.shard_index,
"mempool.remove.success",
1,
vec![],
);
} else {
self.statsd_client.count_with_shard(
height.shard_index,
"mempool.remove.success",
1,
vec![],
);
}
}
for system_message in &transaction.system_messages {
mempool.remove(&system_message.mempool_key());
Expand Down
Loading
Loading