Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,19 @@ impl proto::ValidatorMessage {
block_event.block_timestamp(),
block_event.seqnum().to_string(),
)
} else if let Some(revalidate_message) = &self.revalidate_message {
MempoolKey::new(
MempoolMessageKind::ValidatorMessage,
revalidate_message
.message
.as_ref()
.unwrap()
.data
.as_ref()
.map(|data| data.timestamp)
.unwrap_or(0) as u64,
hex::encode(revalidate_message.message.as_ref().unwrap().hash.clone()),
)
} else {
MempoolKey::new(
MempoolMessageKind::ValidatorMessage,
Expand Down Expand Up @@ -236,6 +249,7 @@ impl MempoolMessage {
on_chain_event: Some(event.clone()),
fname_transfer: None,
block_event: None,
revalidate_message: None,
};
validator_message.mempool_key()
}
Expand All @@ -244,6 +258,7 @@ impl MempoolMessage {
on_chain_event: None,
fname_transfer: Some(fname.clone()),
block_event: None,
revalidate_message: None,
};
validator_message.mempool_key()
}
Expand All @@ -255,6 +270,16 @@ impl MempoolMessage {
on_chain_event: None,
fname_transfer: None,
block_event: Some(block_event.clone()),
revalidate_message: None,
};
validator_message.mempool_key()
}
MempoolMessage::RevalidateMessage(revalidate_message) => {
let validator_message = proto::ValidatorMessage {
on_chain_event: None,
fname_transfer: None,
block_event: None,
revalidate_message: Some(revalidate_message.clone()),
};
validator_message.mempool_key()
}
Expand Down Expand Up @@ -335,6 +360,7 @@ impl ReadNodeMempool {
}
},
MempoolMessage::OnchainEvent(_)
| MempoolMessage::RevalidateMessage(_)
| MempoolMessage::FnameTransfer(_)
| MempoolMessage::BlockEvent { .. } => {
// Don't do duplicate checks for validator messages. They are infrequent, and engine can handle duplicates.
Expand Down Expand Up @@ -461,6 +487,7 @@ impl Mempool {
}
}
MempoolMessage::OnchainEvent(_)
| MempoolMessage::RevalidateMessage(_)
| MempoolMessage::FnameTransfer(_)
| MempoolMessage::BlockEvent { .. } => false,
}
Expand Down
2 changes: 2 additions & 0 deletions src/mempool/mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,15 @@ mod tests {
panic!("Expected validator message, got user message")
}
MempoolMessage::OnchainEvent(_)
| MempoolMessage::RevalidateMessage(_)
| MempoolMessage::FnameTransfer(_)
| MempoolMessage::BlockEvent { .. } => {}
}

match pull_message().await {
MempoolMessage::UserMessage(_) => {}
MempoolMessage::OnchainEvent(_)
| MempoolMessage::RevalidateMessage(_)
| MempoolMessage::FnameTransfer(_)
| MempoolMessage::BlockEvent { .. } => {
panic!("Expected user message, got validator message")
Expand Down
10 changes: 10 additions & 0 deletions src/proto/blocks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,21 @@ message FnameTransfer {
UserNameProof proof = 4;
}

message ExternalData {
optional bytes ens_resolved_address = 1;
}

message RevalidateMessage {
Message message = 1;
optional ExternalData external_data = 2;
}

// Validator initiated prunes/revokes etc
message ValidatorMessage {
OnChainEvent on_chain_event = 1;
FnameTransfer fname_transfer = 2;
BlockEvent block_event = 3;
RevalidateMessage revalidate_message = 4;
}


Expand Down
66 changes: 60 additions & 6 deletions src/storage/store/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use crate::proto::message_data::Body;
use crate::proto::BlockEvent;
use crate::proto::{
self, hub_event, FarcasterNetwork, HubEvent, HubEventType, MessageType, OnChainEvent,
OnChainEventType, Protocol, ShardChunk, Transaction, UserDataType, UserNameProof,
OnChainEventType, Protocol, RevalidateMessage, ShardChunk, Transaction, UserDataType,
UserNameProof,
};
use crate::storage::db::{PageOptions, RocksDB, RocksDbTransactionBatch};
use crate::storage::store::account::{
Expand Down Expand Up @@ -606,6 +607,35 @@ impl ShardEngine {
})
}

fn revalidate_user_message(
&mut self,
revalidate_message: &RevalidateMessage,
timestamp: &FarcasterTime,
version: EngineVersion,
txn_batch: &RocksDbTransactionBatch,
) -> bool {
let message = revalidate_message.message.as_ref().unwrap();
match &message.data.as_ref().unwrap().body {
Some(Body::UsernameProofBody(username_proof)) => {
if let Some(external_data) = &revalidate_message.external_data {
if let Some(ens_resolved_address) = &external_data.ens_resolved_address {
if let Err(_) = self.stores.validate_ens_username_proof(
message.fid(),
&username_proof,
ens_resolved_address,
) {
return false;
}
}
}
}
_ => {}
};

self.validate_user_message(&message, timestamp, version, true, txn_batch)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why hardcode true for is_pro_user?

Copy link
Copy Markdown
Contributor Author

@aditiharini aditiharini Sep 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to determine if the user was a pro user at the block timestamp where the message was originally processed in order to infer this state correctly. We don't have a good way to go from message -> block right now. Defaulting to true essentially always allows the pro-user specific features and bypasses this validation.

.is_ok()
}

pub(crate) fn replay_snapchain_txn(
&mut self,
trie_ctx: &merkle_trie::Context,
Expand Down Expand Up @@ -650,6 +680,29 @@ impl ShardEngine {
}
});
for msg in sorted_system_messages {
if version.is_enabled(ProtocolFeature::RevalidateMessage) {
if let Some(revalidate_message) = &msg.revalidate_message {
let is_valid = self.revalidate_user_message(
&revalidate_message,
timestamp,
version,
txn_batch,
);

if !is_valid {
if let Ok(event) = self.stores.revoke_message(
&revalidate_message.message.as_ref().unwrap(),
txn_batch,
) {
self.update_trie(trie_ctx, &event, txn_batch)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be worth logging here if we're revoking due to a revalidate. I don't think we have any other visibility into the fact that revalidates are happening?

events.push(event);
}
}

system_messages_count += 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should emit a metric for revalidate messages somewhere. Otherwise we'd have no visibility that these messages are being submitted and processed.

}
}

if let Some(onchain_event) = &msg.on_chain_event {
if onchain_event.r#type() == OnChainEventType::EventTypeTierPurchase
&& !version.is_enabled(ProtocolFeature::FarcasterPro)
Expand Down Expand Up @@ -854,7 +907,11 @@ impl ShardEngine {

for msg in &sorted_user_messages {
// Errors are validated based on the shard root
match self.validate_user_message(msg, timestamp, version, txn_batch) {
let is_pro_user = self
.stores
.is_pro_user(msg.fid(), timestamp)
.map_err(|err| HubError::internal_db_error(&err.to_string()))?;
match self.validate_user_message(msg, timestamp, version, is_pro_user, txn_batch) {
Ok(()) => {
let result = self.merge_message(msg, txn_batch);
match result {
Expand Down Expand Up @@ -1270,6 +1327,7 @@ impl ShardEngine {
message: &proto::Message,
timestamp: &FarcasterTime,
version: EngineVersion,
is_pro_user: bool,
txn_batch: &RocksDbTransactionBatch,
) -> Result<(), MessageValidationError> {
let now = std::time::Instant::now();
Expand All @@ -1286,10 +1344,6 @@ impl ShardEngine {
.as_ref()
.ok_or(MessageValidationError::NoMessageData)?;

let is_pro_user = self
.stores
.is_pro_user(message.fid(), timestamp)
.map_err(|err| HubError::internal_db_error(&err.to_string()))?;
validations::message::validate_message(
message,
self.network,
Expand Down
131 changes: 131 additions & 0 deletions src/storage/store/engine_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3809,4 +3809,135 @@ mod tests {
assert!(block_event_exists(&engine, &block_event4));
assert_eq!(block_confirmed.max_block_event_seqnum, 4);
}

#[tokio::test]
async fn test_revalidate_message_username_proof_with_valid_ens() {
let (mut engine, _tmpdir) = test_helper::new_engine().await;
let signer = test_helper::default_signer();
let timestamp = messages_factory::farcaster_time();
let ens_address = test_helper::default_custody_address();

test_helper::register_user(
FID_FOR_TEST,
signer.clone(),
ens_address.clone(),
&mut engine,
)
.await;

// Create and commit a username proof message first
let username_proof = messages_factory::username_proof::create_username_proof(
FID_FOR_TEST as u64,
proto::UserNameType::UsernameTypeEnsL1,
"test.eth".to_string(),
ens_address.clone(),
"signature".to_string(),
timestamp as u64,
Some(&signer),
);

test_helper::commit_message(&mut engine, &username_proof).await;

// Create external data for valid ENS resolution
let external_data = proto::ExternalData {
ens_resolved_address: Some(ens_address),
};

// Process revalidation request with valid ENS data
test_helper::commit_revalidate_message(
&mut engine,
&username_proof,
Some(external_data),
test_helper::Validity::Valid,
)
.await;
}

#[tokio::test]
async fn test_revalidate_message_username_proof_with_invalid_ens() {
let (mut engine, _tmpdir) = test_helper::new_engine().await;
let signer = test_helper::default_signer();
let timestamp = messages_factory::farcaster_time();

test_helper::register_user(
FID_FOR_TEST,
signer.clone(),
test_helper::default_custody_address(),
&mut engine,
)
.await;

// Create and commit a username proof message first
let username_proof = messages_factory::username_proof::create_username_proof(
FID_FOR_TEST as u64,
proto::UserNameType::UsernameTypeEnsL1,
"test.eth".to_string(),
"owner".to_string().encode_to_vec(),
"signature".to_string(),
timestamp as u64,
Some(&signer),
);

test_helper::commit_message(&mut engine, &username_proof).await;

// Create external data with invalid ENS address (different from custody address)
let invalid_address = vec![0u8; 20]; // Invalid/different address
let external_data = proto::ExternalData {
ens_resolved_address: Some(invalid_address),
};

// Process revalidation request with invalid ENS data - should mark as invalid
test_helper::commit_revalidate_message(
&mut engine,
&username_proof,
Some(external_data),
test_helper::Validity::Invalid,
)
.await;
assert!(engine
.get_username_proofs_by_fid(FID_FOR_TEST)
.unwrap()
.messages
.is_empty());
}

#[tokio::test]
async fn test_revalidate_message_invalid_message_validation() {
let (mut engine, _tmpdir) = test_helper::new_engine().await;
let signer = test_helper::default_signer();

test_helper::register_user(
FID_FOR_TEST,
signer.clone(),
test_helper::default_custody_address(),
&mut engine,
)
.await;

// Create a message with invalid timestamp (in the future)
let future_timestamp = get_farcaster_time().unwrap() + 3600; // 1 hour in the future
let invalid_cast = messages_factory::casts::create_cast_add(
FID_FOR_TEST,
"invalid cast",
Some(future_timestamp as u32),
Some(&signer),
);

// These messages were allowed in the past but aren't allowed now
commit_message_at(&mut engine, &invalid_cast, &FarcasterTime::new(0)).await;

// Revalidation should treat the message as invalid and get rid of it
test_helper::commit_revalidate_message(
&mut engine,
&invalid_cast,
None,
test_helper::Validity::Invalid,
)
.await;
assert!(engine
.get_casts_by_fid(FID_FOR_TEST)
.unwrap()
.messages
.is_empty());
}
}
15 changes: 15 additions & 0 deletions src/storage/store/mempool_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub enum MempoolMessage {
for_shard: u32,
message: proto::BlockEvent,
},
RevalidateMessage(proto::RevalidateMessage),
}

impl MempoolMessage {
Expand All @@ -48,6 +49,9 @@ impl MempoolMessage {
for_shard: _,
message: _,
} => 0,
MempoolMessage::RevalidateMessage(revalidate_message) => {
revalidate_message.message.as_ref().unwrap().fid()
}
}
}

Expand Down Expand Up @@ -145,19 +149,30 @@ impl MempoolPoller {
on_chain_event: None,
fname_transfer: None,
block_event: Some(message.clone()),
revalidate_message: None,
}),
MempoolMessage::FnameTransfer(fname_transfer) => {
transaction.system_messages.push(ValidatorMessage {
on_chain_event: None,
fname_transfer: Some(fname_transfer.clone()),
block_event: None,
revalidate_message: None,
})
}
MempoolMessage::OnchainEvent(onchain_event) => {
transaction.system_messages.push(ValidatorMessage {
on_chain_event: Some(onchain_event.clone()),
fname_transfer: None,
block_event: None,
revalidate_message: None,
})
}
MempoolMessage::RevalidateMessage(revalidate_message) => {
transaction.system_messages.push(ValidatorMessage {
on_chain_event: None,
fname_transfer: None,
block_event: None,
revalidate_message: Some(revalidate_message.clone()),
})
}
}
Expand Down
Loading
Loading