diff --git a/src/mempool/mempool.rs b/src/mempool/mempool.rs index e442b99b3..d99608ec6 100644 --- a/src/mempool/mempool.rs +++ b/src/mempool/mempool.rs @@ -200,6 +200,19 @@ impl proto::ValidatorMessage { block_event.block_timestamp(), block_event.seqnum().to_string(), ) + } else if let Some(revalidate_message) = &self.revalidate_message { + MempoolKey::new( + MempoolMessageKind::ValidatorMessage, + revalidate_message + .message + .as_ref() + .unwrap() + .data + .as_ref() + .map(|data| data.timestamp) + .unwrap_or(0) as u64, + hex::encode(revalidate_message.message.as_ref().unwrap().hash.clone()), + ) } else { MempoolKey::new( MempoolMessageKind::ValidatorMessage, @@ -236,6 +249,7 @@ impl MempoolMessage { on_chain_event: Some(event.clone()), fname_transfer: None, block_event: None, + revalidate_message: None, }; validator_message.mempool_key() } @@ -244,6 +258,7 @@ impl MempoolMessage { on_chain_event: None, fname_transfer: Some(fname.clone()), block_event: None, + revalidate_message: None, }; validator_message.mempool_key() } @@ -255,6 +270,16 @@ impl MempoolMessage { on_chain_event: None, fname_transfer: None, block_event: Some(block_event.clone()), + revalidate_message: None, + }; + validator_message.mempool_key() + } + MempoolMessage::RevalidateMessage(revalidate_message) => { + let validator_message = proto::ValidatorMessage { + on_chain_event: None, + fname_transfer: None, + block_event: None, + revalidate_message: Some(revalidate_message.clone()), }; validator_message.mempool_key() } @@ -335,6 +360,7 @@ impl ReadNodeMempool { } }, MempoolMessage::OnchainEvent(_) + | MempoolMessage::RevalidateMessage(_) | MempoolMessage::FnameTransfer(_) | MempoolMessage::BlockEvent { .. } => { // Don't do duplicate checks for validator messages. They are infrequent, and engine can handle duplicates. @@ -461,6 +487,7 @@ impl Mempool { } } MempoolMessage::OnchainEvent(_) + | MempoolMessage::RevalidateMessage(_) | MempoolMessage::FnameTransfer(_) | MempoolMessage::BlockEvent { .. } => false, } diff --git a/src/mempool/mempool_test.rs b/src/mempool/mempool_test.rs index 2c6d2ebb1..b69476a71 100644 --- a/src/mempool/mempool_test.rs +++ b/src/mempool/mempool_test.rs @@ -395,6 +395,7 @@ mod tests { panic!("Expected validator message, got user message") } MempoolMessage::OnchainEvent(_) + | MempoolMessage::RevalidateMessage(_) | MempoolMessage::FnameTransfer(_) | MempoolMessage::BlockEvent { .. } => {} } @@ -402,6 +403,7 @@ mod tests { match pull_message().await { MempoolMessage::UserMessage(_) => {} MempoolMessage::OnchainEvent(_) + | MempoolMessage::RevalidateMessage(_) | MempoolMessage::FnameTransfer(_) | MempoolMessage::BlockEvent { .. } => { panic!("Expected user message, got validator message") diff --git a/src/proto/blocks.proto b/src/proto/blocks.proto index bf2facd32..5391bc558 100644 --- a/src/proto/blocks.proto +++ b/src/proto/blocks.proto @@ -182,11 +182,21 @@ message FnameTransfer { UserNameProof proof = 4; } +message ExternalData { + optional bytes ens_resolved_address = 1; +} + +message RevalidateMessage { + Message message = 1; + optional ExternalData external_data = 2; +} + // Validator initiated prunes/revokes etc message ValidatorMessage { OnChainEvent on_chain_event = 1; FnameTransfer fname_transfer = 2; BlockEvent block_event = 3; + RevalidateMessage revalidate_message = 4; } diff --git a/src/storage/store/engine.rs b/src/storage/store/engine.rs index f85e7d095..45353651a 100644 --- a/src/storage/store/engine.rs +++ b/src/storage/store/engine.rs @@ -8,7 +8,8 @@ use crate::proto::message_data::Body; use crate::proto::BlockEvent; use crate::proto::{ self, hub_event, FarcasterNetwork, HubEvent, HubEventType, MessageType, OnChainEvent, - OnChainEventType, Protocol, ShardChunk, Transaction, UserDataType, UserNameProof, + OnChainEventType, Protocol, RevalidateMessage, ShardChunk, Transaction, UserDataType, + UserNameProof, }; use crate::storage::db::{PageOptions, RocksDB, RocksDbTransactionBatch}; use crate::storage::store::account::{ @@ -606,6 +607,35 @@ impl ShardEngine { }) } + fn revalidate_user_message( + &mut self, + revalidate_message: &RevalidateMessage, + timestamp: &FarcasterTime, + version: EngineVersion, + txn_batch: &RocksDbTransactionBatch, + ) -> bool { + let message = revalidate_message.message.as_ref().unwrap(); + match &message.data.as_ref().unwrap().body { + Some(Body::UsernameProofBody(username_proof)) => { + if let Some(external_data) = &revalidate_message.external_data { + if let Some(ens_resolved_address) = &external_data.ens_resolved_address { + if let Err(_) = self.stores.validate_ens_username_proof( + message.fid(), + &username_proof, + ens_resolved_address, + ) { + return false; + } + } + } + } + _ => {} + }; + + self.validate_user_message(&message, timestamp, version, true, txn_batch) + .is_ok() + } + pub(crate) fn replay_snapchain_txn( &mut self, trie_ctx: &merkle_trie::Context, @@ -650,6 +680,29 @@ impl ShardEngine { } }); for msg in sorted_system_messages { + if version.is_enabled(ProtocolFeature::RevalidateMessage) { + if let Some(revalidate_message) = &msg.revalidate_message { + let is_valid = self.revalidate_user_message( + &revalidate_message, + timestamp, + version, + txn_batch, + ); + + if !is_valid { + if let Ok(event) = self.stores.revoke_message( + &revalidate_message.message.as_ref().unwrap(), + txn_batch, + ) { + self.update_trie(trie_ctx, &event, txn_batch)?; + events.push(event); + } + } + + system_messages_count += 1; + } + } + if let Some(onchain_event) = &msg.on_chain_event { if onchain_event.r#type() == OnChainEventType::EventTypeTierPurchase && !version.is_enabled(ProtocolFeature::FarcasterPro) @@ -854,7 +907,11 @@ impl ShardEngine { for msg in &sorted_user_messages { // Errors are validated based on the shard root - match self.validate_user_message(msg, timestamp, version, txn_batch) { + let is_pro_user = self + .stores + .is_pro_user(msg.fid(), timestamp) + .map_err(|err| HubError::internal_db_error(&err.to_string()))?; + match self.validate_user_message(msg, timestamp, version, is_pro_user, txn_batch) { Ok(()) => { let result = self.merge_message(msg, txn_batch); match result { @@ -1270,6 +1327,7 @@ impl ShardEngine { message: &proto::Message, timestamp: &FarcasterTime, version: EngineVersion, + is_pro_user: bool, txn_batch: &RocksDbTransactionBatch, ) -> Result<(), MessageValidationError> { let now = std::time::Instant::now(); @@ -1286,10 +1344,6 @@ impl ShardEngine { .as_ref() .ok_or(MessageValidationError::NoMessageData)?; - let is_pro_user = self - .stores - .is_pro_user(message.fid(), timestamp) - .map_err(|err| HubError::internal_db_error(&err.to_string()))?; validations::message::validate_message( message, self.network, diff --git a/src/storage/store/engine_tests.rs b/src/storage/store/engine_tests.rs index d373e0468..6c9542d2c 100644 --- a/src/storage/store/engine_tests.rs +++ b/src/storage/store/engine_tests.rs @@ -3809,4 +3809,135 @@ mod tests { assert!(block_event_exists(&engine, &block_event4)); assert_eq!(block_confirmed.max_block_event_seqnum, 4); } + + #[tokio::test] + async fn test_revalidate_message_username_proof_with_valid_ens() { + let (mut engine, _tmpdir) = test_helper::new_engine().await; + let signer = test_helper::default_signer(); + let timestamp = messages_factory::farcaster_time(); + let ens_address = test_helper::default_custody_address(); + + test_helper::register_user( + FID_FOR_TEST, + signer.clone(), + ens_address.clone(), + &mut engine, + ) + .await; + + // Create and commit a username proof message first + let username_proof = messages_factory::username_proof::create_username_proof( + FID_FOR_TEST as u64, + proto::UserNameType::UsernameTypeEnsL1, + "test.eth".to_string(), + ens_address.clone(), + "signature".to_string(), + timestamp as u64, + Some(&signer), + ); + + test_helper::commit_message(&mut engine, &username_proof).await; + + // Create external data for valid ENS resolution + let external_data = proto::ExternalData { + ens_resolved_address: Some(ens_address), + }; + + // Process revalidation request with valid ENS data + test_helper::commit_revalidate_message( + &mut engine, + &username_proof, + Some(external_data), + test_helper::Validity::Valid, + ) + .await; + } + + #[tokio::test] + async fn test_revalidate_message_username_proof_with_invalid_ens() { + let (mut engine, _tmpdir) = test_helper::new_engine().await; + let signer = test_helper::default_signer(); + let timestamp = messages_factory::farcaster_time(); + + test_helper::register_user( + FID_FOR_TEST, + signer.clone(), + test_helper::default_custody_address(), + &mut engine, + ) + .await; + + // Create and commit a username proof message first + let username_proof = messages_factory::username_proof::create_username_proof( + FID_FOR_TEST as u64, + proto::UserNameType::UsernameTypeEnsL1, + "test.eth".to_string(), + "owner".to_string().encode_to_vec(), + "signature".to_string(), + timestamp as u64, + Some(&signer), + ); + + test_helper::commit_message(&mut engine, &username_proof).await; + + // Create external data with invalid ENS address (different from custody address) + let invalid_address = vec![0u8; 20]; // Invalid/different address + let external_data = proto::ExternalData { + ens_resolved_address: Some(invalid_address), + }; + + // Process revalidation request with invalid ENS data - should mark as invalid + test_helper::commit_revalidate_message( + &mut engine, + &username_proof, + Some(external_data), + test_helper::Validity::Invalid, + ) + .await; + assert!(engine + .get_username_proofs_by_fid(FID_FOR_TEST) + .unwrap() + .messages + .is_empty()); + } + + #[tokio::test] + async fn test_revalidate_message_invalid_message_validation() { + let (mut engine, _tmpdir) = test_helper::new_engine().await; + let signer = test_helper::default_signer(); + + test_helper::register_user( + FID_FOR_TEST, + signer.clone(), + test_helper::default_custody_address(), + &mut engine, + ) + .await; + + // Create a message with invalid timestamp (in the future) + let future_timestamp = get_farcaster_time().unwrap() + 3600; // 1 hour in the future + let invalid_cast = messages_factory::casts::create_cast_add( + FID_FOR_TEST, + "invalid cast", + Some(future_timestamp as u32), + Some(&signer), + ); + + // These messages were allowed in the past but aren't allowed now + commit_message_at(&mut engine, &invalid_cast, &FarcasterTime::new(0)).await; + + // Revalidation should treat the message as invalid and get rid of it + test_helper::commit_revalidate_message( + &mut engine, + &invalid_cast, + None, + test_helper::Validity::Invalid, + ) + .await; + assert!(engine + .get_casts_by_fid(FID_FOR_TEST) + .unwrap() + .messages + .is_empty()); + } } diff --git a/src/storage/store/mempool_poller.rs b/src/storage/store/mempool_poller.rs index 20bf4edc8..cf3070414 100644 --- a/src/storage/store/mempool_poller.rs +++ b/src/storage/store/mempool_poller.rs @@ -36,6 +36,7 @@ pub enum MempoolMessage { for_shard: u32, message: proto::BlockEvent, }, + RevalidateMessage(proto::RevalidateMessage), } impl MempoolMessage { @@ -48,6 +49,9 @@ impl MempoolMessage { for_shard: _, message: _, } => 0, + MempoolMessage::RevalidateMessage(revalidate_message) => { + revalidate_message.message.as_ref().unwrap().fid() + } } } @@ -145,12 +149,14 @@ impl MempoolPoller { on_chain_event: None, fname_transfer: None, block_event: Some(message.clone()), + revalidate_message: None, }), MempoolMessage::FnameTransfer(fname_transfer) => { transaction.system_messages.push(ValidatorMessage { on_chain_event: None, fname_transfer: Some(fname_transfer.clone()), block_event: None, + revalidate_message: None, }) } MempoolMessage::OnchainEvent(onchain_event) => { @@ -158,6 +164,15 @@ impl MempoolPoller { on_chain_event: Some(onchain_event.clone()), fname_transfer: None, block_event: None, + revalidate_message: None, + }) + } + MempoolMessage::RevalidateMessage(revalidate_message) => { + transaction.system_messages.push(ValidatorMessage { + on_chain_event: None, + fname_transfer: None, + block_event: None, + revalidate_message: Some(revalidate_message.clone()), }) } } diff --git a/src/storage/store/stores.rs b/src/storage/store/stores.rs index 9f01b7939..2107ec7f5 100644 --- a/src/storage/store/stores.rs +++ b/src/storage/store/stores.rs @@ -444,6 +444,30 @@ impl Stores { Ok(response) } + pub fn revoke_message( + &mut self, + message: &proto::Message, + txn: &mut RocksDbTransactionBatch, + ) -> Result { + match message.msg_type() { + MessageType::FrameAction | MessageType::None => { + Err(HubError::internal_db_error("invalid message type")) + } + MessageType::CastAdd | MessageType::CastRemove => self.cast_store.revoke(message, txn), + MessageType::ReactionAdd | MessageType::ReactionRemove => { + self.reaction_store.revoke(message, txn) + } + MessageType::LinkCompactState | MessageType::LinkAdd | MessageType::LinkRemove => { + self.link_store.revoke(message, txn) + } + MessageType::VerificationAddEthAddress | MessageType::VerificationRemove => { + self.verification_store.revoke(message, txn) + } + MessageType::UserDataAdd => self.user_data_store.revoke(message, txn), + MessageType::UsernameProof => self.username_proof_store.revoke(message, txn), + } + } + pub fn revoke_messages( &self, fid: u64, @@ -645,6 +669,51 @@ impl Stores { ); Ok(count) } + + pub fn validate_ens_username_proof( + &self, + fid: u64, + proof: &proto::UserNameProof, + resolved_ens_address: &Vec, + ) -> Result<(), HubError> { + if *resolved_ens_address != proof.owner { + return Err(HubError::validation_failure( + "invalid ens name, resolved address doesn't match proof owner address", + )); + } + + let id_register = self + .onchain_event_store + .get_id_register_event_by_fid(fid, None) + .map_err(|_| HubError::internal_db_error("Could not fetch id registration"))?; + + match id_register { + None => return Err(HubError::validation_failure("missing fid registration")), + Some(id_register) => { + match id_register.body { + Some(proto::on_chain_event::Body::IdRegisterEventBody(id_register)) => { + // Check verified addresses if the resolved address doesn't match the custody address + if id_register.to != *resolved_ens_address { + let verification = VerificationStore::get_verification_add( + &self.verification_store, + fid, + &resolved_ens_address, + None, + )?; + + match verification { + None => Err(HubError::validation_failure("invalid ens proof, no matching custody address or verified addresses")), + Some(_) => Ok(()), + } + } else { + Ok(()) + } + } + _ => return Err(HubError::validation_failure("missing fid registration")), + } + } + } + } } #[cfg(test)] diff --git a/src/storage/store/test_helper.rs b/src/storage/store/test_helper.rs index e1263736f..d5a764e5f 100644 --- a/src/storage/store/test_helper.rs +++ b/src/storage/store/test_helper.rs @@ -239,7 +239,6 @@ pub async fn sign_chunk(keypair: &Keypair, mut shard_chunk: ShardChunk) -> Shard shard_chunk } -#[cfg(test)] pub async fn commit_message(engine: &mut ShardEngine, msg: &proto::Message) -> ShardChunk { let state_change = engine.propose_state_change(1, vec![MempoolMessage::UserMessage(msg.clone())], None); @@ -257,6 +256,40 @@ pub async fn commit_message(engine: &mut ShardEngine, msg: &proto::Message) -> S chunk } +pub enum Validity { + Valid, + Invalid, +} + +pub async fn commit_revalidate_message( + engine: &mut ShardEngine, + message: &proto::Message, + external_data: Option, + validity: Validity, +) -> ShardChunk { + let state_change = engine.propose_state_change( + engine.shard_id(), + vec![MempoolMessage::RevalidateMessage( + proto::RevalidateMessage { + message: Some(message.clone()), + external_data, + }, + )], + None, + ); + + let shard_chunk = validate_and_commit_state_change(engine, &state_change).await; + let expect_trie_key_exists = match validity { + Validity::Invalid => false, + Validity::Valid => true, + }; + assert_eq!( + engine.trie_key_exists(trie_ctx(), &TrieKey::for_message(message)), + expect_trie_key_exists + ); + shard_chunk +} + // Note, this function does not check that the commit was successful, unlike `commit_message`. pub async fn commit_message_at( engine: &mut ShardEngine, @@ -308,7 +341,6 @@ pub async fn commit_messages(engine: &mut ShardEngine, msgs: Vec chunk } -#[cfg(test)] pub fn trie_ctx() -> &'static mut merkle_trie::Context<'static> { Box::leak(Box::new(merkle_trie::Context::new())) } diff --git a/src/version/version.rs b/src/version/version.rs index a66cb20bb..304cbc4e6 100644 --- a/src/version/version.rs +++ b/src/version/version.rs @@ -34,6 +34,7 @@ pub enum ProtocolFeature { WriteDataToShardZero, ReadDataFromShardZero, UserProfileToken, + RevalidateMessage, } pub struct VersionSchedule { @@ -167,9 +168,9 @@ impl EngineVersion { ProtocolFeature::DependentMessagesInBulkSubmit => self >= &EngineVersion::V7, ProtocolFeature::DecoupleShardZeroBlockProduction => self >= &EngineVersion::V8, ProtocolFeature::WriteDataToShardZero => self >= &EngineVersion::V9, - ProtocolFeature::ReadDataFromShardZero | ProtocolFeature::UserProfileToken => { - self >= &EngineVersion::V10 - } + ProtocolFeature::ReadDataFromShardZero + | ProtocolFeature::UserProfileToken + | ProtocolFeature::RevalidateMessage => self >= &EngineVersion::V10, } }