Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions src/network/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,21 +309,24 @@ impl SnapchainGossip {
return Err(Box::new(e));
}
} else {
// Create a Gossipsub topic
let topic = gossipsub::IdentTopic::new(CONSENSUS_TOPIC);
// subscribes to our topic
let result = swarm.behaviour_mut().gossipsub.subscribe(&topic);
if let Err(e) = result {
warn!("Failed to subscribe to topic: {:?}", e);
return Err(Box::new(e));
}
}

let topic = gossipsub::IdentTopic::new(MEMPOOL_TOPIC);
let result = swarm.behaviour_mut().gossipsub.subscribe(&topic);
if let Err(e) = result {
warn!("Failed to subscribe to topic: {:?}", e);
return Err(Box::new(e));
}
// Both validators and read nodes join the mempool mesh: validators
// consume mempool messages for inclusion in blocks; read nodes accept
// client-submitted messages via RPC and need to be useful relays for
// them. A node that publishes to a topic without subscribing falls
// back to fanout (best-effort, TTL'd, not in the mesh).
let topic = gossipsub::IdentTopic::new(MEMPOOL_TOPIC);
let result = swarm.behaviour_mut().gossipsub.subscribe(&topic);
if let Err(e) = result {
warn!("Failed to subscribe to topic: {:?}", e);
return Err(Box::new(e));
}

let topic = gossipsub::IdentTopic::new(CONTACT_INFO);
Expand Down
65 changes: 65 additions & 0 deletions src/network/gossip_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,71 @@ async fn test_gossip_communication() {
assert_eq!(receive_counts, 1);
}

/// Regression test for #865: read nodes must subscribe to MEMPOOL_TOPIC so that
/// they receive mempool gossip from validator peers. Before the fix, a read
/// node never joined the mempool mesh and never delivered inbound mempool
/// messages to its system channel, regardless of how many validator peers
/// were publishing.
#[tokio::test]
#[serial]
async fn test_read_node_receives_mempool_gossip() {
let validator_keypair = Keypair::generate();
let read_node_keypair = Keypair::generate();

let validator_port = BASE_PORT_FOR_TEST + 20;
let validator_addr = format!("/ip4/{HOST_FOR_TEST}/udp/{validator_port}/quic-v1");

let read_node_port = BASE_PORT_FOR_TEST + 21;
let read_node_addr = format!("/ip4/{HOST_FOR_TEST}/udp/{read_node_port}/quic-v1");

let validator_config = Config::new(validator_addr.clone(), read_node_addr.clone())
.with_contact_info_interval(Duration::from_millis(100));
let read_node_config = Config::new(read_node_addr.clone(), validator_addr.clone())
.with_contact_info_interval(Duration::from_millis(100));

let (validator_system_tx, _) = mpsc::channel::<SystemMessage>(100);
let (read_node_system_tx, mut read_node_system_rx) = mpsc::channel::<SystemMessage>(100);

let mut validator_gossip = SnapchainGossip::create(
validator_keypair,
&validator_config,
Some(validator_system_tx),
false,
FarcasterNetwork::Devnet,
statsd_client(),
)
.await
.unwrap();

let mut read_node_gossip = SnapchainGossip::create(
read_node_keypair,
&read_node_config,
Some(read_node_system_tx),
true,
FarcasterNetwork::Devnet,
statsd_client(),
)
.await
.unwrap();

let validator_tx = validator_gossip.tx.clone();

tokio::spawn(async move { validator_gossip.start().await });
tokio::spawn(async move { read_node_gossip.start().await });

tokio::time::sleep(Duration::from_secs(1)).await;

let cast_add = messages_factory::casts::create_cast_add(456, "regression", None, None);
let mempool_msg = MempoolMessage::UserMessage(cast_add.clone());
validator_tx
.send(GossipEvent::BroadcastMempoolMessage(mempool_msg))
.await
.unwrap();

let receive_counts = wait_for_message(&mut read_node_system_rx, cast_add).await;
assert_eq!(receive_counts, 1);
Comment thread
manan19 marked this conversation as resolved.
Outdated
}

#[tokio::test]
#[serial]
async fn test_bootstrap_peer_reconnection() {
Expand Down
Loading