Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions src/network/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1143,9 +1143,8 @@ pub struct HubEvent {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct EventsResponse {
pub events: Vec<HubEvent>,
// TODO: What's the best way to support next page token with multiple shards?
Comment thread
manan19 marked this conversation as resolved.
// #[serde(rename = "nextPageToken", skip_serializing_if = "Option::is_none")]
// pub next_page_token: Option<String>,
#[serde(rename = "nextPageToken", skip_serializing_if = "Option::is_none")]
pub next_page_token: Option<String>,
}

#[allow(non_snake_case)]
Expand Down Expand Up @@ -1983,12 +1982,9 @@ fn map_proto_messages_response_to_json_paged_response(
.iter()
.map(|m| map_proto_message_to_json_message(m.clone()).unwrap())
.collect(),
next_page_token: Some(
messages_response
.next_page_token
.map(|t| BASE64_STANDARD.encode(t))
.unwrap_or_else(|| "".to_string()),
),
next_page_token: messages_response
.next_page_token
.map(|t| BASE64_STANDARD.encode(t)),
})
}

Comment thread
manan19 marked this conversation as resolved.
Expand Down Expand Up @@ -2968,6 +2964,9 @@ where
.iter()
.map(|e| map_proto_hub_event_to_json_hub_event(e.clone()).unwrap())
.collect(),
next_page_token: events_response
.next_page_token
.map(|t| BASE64_STANDARD.encode(t)),
})
}

Expand Down Expand Up @@ -3557,3 +3556,38 @@ where
}
})
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn paged_response_omits_next_page_token_when_proto_token_is_none() {
let response = proto::MessagesResponse {
messages: vec![],
next_page_token: None,
};
let mapped = map_proto_messages_response_to_json_paged_response(response).unwrap();
let json = serde_json::to_value(&mapped).unwrap();
assert!(
json.get("nextPageToken").is_none(),
"expected nextPageToken to be omitted, got {json}",
);
}

#[test]
fn paged_response_base64_encodes_next_page_token_when_present() {
let token = vec![0u8, 1, 2, 253, 254, 255];
let expected = BASE64_STANDARD.encode(&token);
let response = proto::MessagesResponse {
messages: vec![],
next_page_token: Some(token),
};
let mapped = map_proto_messages_response_to_json_paged_response(response).unwrap();
let json = serde_json::to_value(&mapped).unwrap();
assert_eq!(
json.get("nextPageToken").and_then(|v| v.as_str()),
Some(expected.as_str()),
);
}
}
44 changes: 26 additions & 18 deletions src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,20 @@ impl MyHubService {
}
}

/// Helper function to serialize page tokens only if at least one shard has more data.
/// Returns None if all shards have finished pagination (all tokens are None).
fn serialize_page_token_if_any_shard_has_next(
next_page_tokens: Vec<Option<Vec<u8>>>,
) -> Result<Option<Vec<u8>>, Status> {
if next_page_tokens.iter().any(|token| token.is_some()) {
let new_page_token = serde_json::to_vec(&next_page_tokens)
.map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?;
Comment thread
manan19 marked this conversation as resolved.
Ok(Some(new_page_token))
} else {
Ok(None)
}
}
Comment thread
manan19 marked this conversation as resolved.

#[tonic::async_trait]
impl HubService for MyHubService {
async fn submit_message(
Expand Down Expand Up @@ -1467,11 +1481,11 @@ impl HubService for MyHubService {
pages.iter().flat_map(|page| page.events.clone()).collect();
let next_page_tokens: Vec<Option<Vec<u8>>> =
pages.into_iter().map(|page| page.next_page_token).collect();
let new_page_token = serde_json::to_vec(&next_page_tokens)
.map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?;
let next_page_token = serialize_page_token_if_any_shard_has_next(next_page_tokens)?;

let response = EventsResponse {
events: combined_events,
next_page_token: Some(new_page_token),
next_page_token,
};

Ok(Response::new(response))
Expand Down Expand Up @@ -1800,11 +1814,11 @@ impl HubService for MyHubService {
.collect();
let next_page_tokens: Vec<Option<Vec<u8>>> =
pages.into_iter().map(|page| page.next_page_token).collect();
let new_page_token = serde_json::to_vec(&next_page_tokens)
.map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?;
let next_page_token = serialize_page_token_if_any_shard_has_next(next_page_tokens)?;

let response = MessagesResponse {
messages: combined_messages,
next_page_token: Some(new_page_token),
next_page_token,
};

Ok(Response::new(response))
Expand Down Expand Up @@ -1859,13 +1873,11 @@ impl HubService for MyHubService {

let next_page_tokens: Vec<Option<Vec<u8>>> =
pages.into_iter().map(|page| page.next_page_token).collect();

let new_page_token = serde_json::to_vec(&next_page_tokens)
.map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?;
let next_page_token = serialize_page_token_if_any_shard_has_next(next_page_tokens)?;

let response = MessagesResponse {
messages: combined_messages,
next_page_token: Some(new_page_token),
next_page_token,
};

Ok(Response::new(response))
Expand Down Expand Up @@ -1937,13 +1949,11 @@ impl HubService for MyHubService {

let next_page_tokens: Vec<Option<Vec<u8>>> =
pages.into_iter().map(|page| page.next_page_token).collect();

let new_page_token = serde_json::to_vec(&next_page_tokens)
.map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?;
let next_page_token = serialize_page_token_if_any_shard_has_next(next_page_tokens)?;

let response = MessagesResponse {
messages: combined_messages,
next_page_token: Some(new_page_token),
next_page_token,
};

Ok(Response::new(response))
Expand Down Expand Up @@ -2446,13 +2456,11 @@ impl HubService for MyHubService {

let next_page_tokens: Vec<Option<Vec<u8>>> =
pages.into_iter().map(|page| page.next_page_token).collect();

let new_page_token = serde_json::to_vec(&next_page_tokens)
.map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?;
let next_page_token = serialize_page_token_if_any_shard_has_next(next_page_tokens)?;

let response = MessagesResponse {
messages: combined_messages,
next_page_token: Some(new_page_token),
next_page_token,
};

Ok(Response::new(response))
Expand Down
113 changes: 113 additions & 0 deletions src/network/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,119 @@ mod tests {
test_helper::assert_contains_all_messages(&response, &[&reply_1, &reply_3_another_shard]);
}

#[tokio::test]
async fn test_get_casts_by_parent_returns_none_token_when_exhausted() {
// Regression test for the cross-shard pagination bug: when every shard
// has finished paginating, the per-shard tokens are all None, but the
// server was serializing `[null, null]` and returning it as a page
// token. Clients then re-sent it and received the same results again,
// looping forever. The token should be None when nothing is left.
Comment thread
manan19 marked this conversation as resolved.
let (
_stores,
_senders,
[mut engine1, mut engine2],
_block_engine,
service,
_shard_decision_tx,
_block_decision_tx,
) = make_server(None).await;
test_helper::register_user(
SHARD1_FID,
test_helper::default_signer(),
test_helper::default_custody_address(),
&mut engine1,
)
.await;
test_helper::register_user(
SHARD2_FID,
test_helper::default_signer(),
test_helper::default_custody_address(),
&mut engine2,
)
.await;
let original_cast =
messages_factory::casts::create_cast_add(SHARD1_FID, "test", None, None);
let timestamp = original_cast.data.as_ref().unwrap().timestamp;
let reply_shard1 = messages_factory::casts::create_cast_with_parent(
SHARD1_FID,
"reply shard1",
SHARD1_FID,
&original_cast.hash,
Some(timestamp + 1),
None,
);
let reply_shard2 = messages_factory::casts::create_cast_with_parent(
SHARD2_FID,
"reply shard2",
SHARD1_FID,
&original_cast.hash,
Some(timestamp + 2),
None,
);
test_helper::commit_message(&mut engine1, &original_cast).await;
test_helper::commit_message(&mut engine1, &reply_shard1).await;
test_helper::commit_message(&mut engine2, &reply_shard2).await;

// page_size large enough to drain both shards in one call.
let response = service
.get_casts_by_parent(Request::new(proto::CastsByParentRequest {
parent: Some(proto::casts_by_parent_request::Parent::ParentCastId(
proto::CastId {
fid: SHARD1_FID,
hash: original_cast.hash.clone(),
},
)),
page_size: Some(10),
page_token: None,
reverse: None,
}))
.await
.unwrap();
test_helper::assert_contains_all_messages(&response, &[&reply_shard1, &reply_shard2]);
assert!(
response.get_ref().next_page_token.is_none(),
"next_page_token should be None when all shards are exhausted, got {:?}",
response.get_ref().next_page_token,
);
}

#[tokio::test]
async fn test_get_events_returns_none_token_when_exhausted() {
// Same regression as test_get_casts_by_parent_returns_none_token_when_exhausted
// but exercises the get_events code path so we cover a second call site
// of the shared helper.
let (
stores,
_senders,
_engines,
_block_engine,
service,
_shard_decision_tx,
_block_decision_tx,
) = make_server(None).await;

write_events_to_db(stores.get(&1u32).unwrap().shard_store.db.clone(), 3).await;

let response = service
.get_events(Request::new(proto::EventsRequest {
start_id: 0,
shard_index: None,
stop_id: None,
page_size: Some(100),
page_token: None,
reverse: None,
}))
.await
.unwrap();
let response = response.get_ref();
assert_eq!(response.events.len(), 3);
assert!(
response.next_page_token.is_none(),
"next_page_token should be None when all shards are exhausted, got {:?}",
response.next_page_token,
);
}

#[tokio::test]
async fn test_storage_limits() {
// Works with no storage
Expand Down
Loading