diff --git a/src/network/http_server.rs b/src/network/http_server.rs index ea4ee29f..f46610dd 100644 --- a/src/network/http_server.rs +++ b/src/network/http_server.rs @@ -1143,9 +1143,8 @@ pub struct HubEvent { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct EventsResponse { pub events: Vec, - // TODO: What's the best way to support next page token with multiple shards? - // #[serde(rename = "nextPageToken", skip_serializing_if = "Option::is_none")] - // pub next_page_token: Option, + #[serde(rename = "nextPageToken", skip_serializing_if = "Option::is_none")] + pub next_page_token: Option, } #[allow(non_snake_case)] @@ -1983,12 +1982,9 @@ fn map_proto_messages_response_to_json_paged_response( .iter() .map(|m| map_proto_message_to_json_message(m.clone()).unwrap()) .collect(), - next_page_token: Some( - messages_response - .next_page_token - .map(|t| BASE64_STANDARD.encode(t)) - .unwrap_or_else(|| "".to_string()), - ), + next_page_token: messages_response + .next_page_token + .map(|t| BASE64_STANDARD.encode(t)), }) } @@ -2968,6 +2964,9 @@ where .iter() .map(|e| map_proto_hub_event_to_json_hub_event(e.clone()).unwrap()) .collect(), + next_page_token: events_response + .next_page_token + .map(|t| BASE64_STANDARD.encode(t)), }) } @@ -3557,3 +3556,38 @@ where } }) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn paged_response_omits_next_page_token_when_proto_token_is_none() { + let response = proto::MessagesResponse { + messages: vec![], + next_page_token: None, + }; + let mapped = map_proto_messages_response_to_json_paged_response(response).unwrap(); + let json = serde_json::to_value(&mapped).unwrap(); + assert!( + json.get("nextPageToken").is_none(), + "expected nextPageToken to be omitted, got {json}", + ); + } + + #[test] + fn paged_response_base64_encodes_next_page_token_when_present() { + let token = vec![0u8, 1, 2, 253, 254, 255]; + let expected = BASE64_STANDARD.encode(&token); + let response = proto::MessagesResponse { + messages: vec![], + next_page_token: Some(token), + }; + let mapped = map_proto_messages_response_to_json_paged_response(response).unwrap(); + let json = serde_json::to_value(&mapped).unwrap(); + assert_eq!( + json.get("nextPageToken").and_then(|v| v.as_str()), + Some(expected.as_str()), + ); + } +} diff --git a/src/network/server.rs b/src/network/server.rs index 4ff0066b..ffb26bbd 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -828,6 +828,20 @@ impl MyHubService { } } +/// Helper function to serialize page tokens only if at least one shard has more data. +/// Returns None if all shards have finished pagination (all tokens are None). +fn serialize_page_token_if_any_shard_has_next( + next_page_tokens: Vec>>, +) -> Result>, Status> { + if next_page_tokens.iter().any(|token| token.is_some()) { + let new_page_token = serde_json::to_vec(&next_page_tokens) + .map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?; + Ok(Some(new_page_token)) + } else { + Ok(None) + } +} + #[tonic::async_trait] impl HubService for MyHubService { async fn submit_message( @@ -1467,11 +1481,11 @@ impl HubService for MyHubService { pages.iter().flat_map(|page| page.events.clone()).collect(); let next_page_tokens: Vec>> = pages.into_iter().map(|page| page.next_page_token).collect(); - let new_page_token = serde_json::to_vec(&next_page_tokens) - .map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?; + let next_page_token = serialize_page_token_if_any_shard_has_next(next_page_tokens)?; + let response = EventsResponse { events: combined_events, - next_page_token: Some(new_page_token), + next_page_token, }; Ok(Response::new(response)) @@ -1800,11 +1814,11 @@ impl HubService for MyHubService { .collect(); let next_page_tokens: Vec>> = pages.into_iter().map(|page| page.next_page_token).collect(); - let new_page_token = serde_json::to_vec(&next_page_tokens) - .map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?; + let next_page_token = serialize_page_token_if_any_shard_has_next(next_page_tokens)?; + let response = MessagesResponse { messages: combined_messages, - next_page_token: Some(new_page_token), + next_page_token, }; Ok(Response::new(response)) @@ -1859,13 +1873,11 @@ impl HubService for MyHubService { let next_page_tokens: Vec>> = pages.into_iter().map(|page| page.next_page_token).collect(); - - let new_page_token = serde_json::to_vec(&next_page_tokens) - .map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?; + let next_page_token = serialize_page_token_if_any_shard_has_next(next_page_tokens)?; let response = MessagesResponse { messages: combined_messages, - next_page_token: Some(new_page_token), + next_page_token, }; Ok(Response::new(response)) @@ -1937,13 +1949,11 @@ impl HubService for MyHubService { let next_page_tokens: Vec>> = pages.into_iter().map(|page| page.next_page_token).collect(); - - let new_page_token = serde_json::to_vec(&next_page_tokens) - .map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?; + let next_page_token = serialize_page_token_if_any_shard_has_next(next_page_tokens)?; let response = MessagesResponse { messages: combined_messages, - next_page_token: Some(new_page_token), + next_page_token, }; Ok(Response::new(response)) @@ -2446,13 +2456,11 @@ impl HubService for MyHubService { let next_page_tokens: Vec>> = pages.into_iter().map(|page| page.next_page_token).collect(); - - let new_page_token = serde_json::to_vec(&next_page_tokens) - .map_err(|e| Status::internal(format!("Failed to serialize next_page_token: {}", e)))?; + let next_page_token = serialize_page_token_if_any_shard_has_next(next_page_tokens)?; let response = MessagesResponse { messages: combined_messages, - next_page_token: Some(new_page_token), + next_page_token, }; Ok(Response::new(response)) diff --git a/src/network/server_tests.rs b/src/network/server_tests.rs index c8a9b44b..bc6827ed 100644 --- a/src/network/server_tests.rs +++ b/src/network/server_tests.rs @@ -1588,6 +1588,119 @@ mod tests { test_helper::assert_contains_all_messages(&response, &[&reply_1, &reply_3_another_shard]); } + #[tokio::test] + async fn test_get_casts_by_parent_returns_none_token_when_exhausted() { + // Regression test for the cross-shard pagination bug: when every shard + // has finished paginating, the per-shard tokens are all None, but the + // server was serializing `[null, null]` and returning it as a page + // token. Clients then re-sent it and received the same results again, + // looping forever. The token should be None when nothing is left. + let ( + _stores, + _senders, + [mut engine1, mut engine2], + _block_engine, + service, + _shard_decision_tx, + _block_decision_tx, + ) = make_server(None).await; + test_helper::register_user( + SHARD1_FID, + test_helper::default_signer(), + test_helper::default_custody_address(), + &mut engine1, + ) + .await; + test_helper::register_user( + SHARD2_FID, + test_helper::default_signer(), + test_helper::default_custody_address(), + &mut engine2, + ) + .await; + let original_cast = + messages_factory::casts::create_cast_add(SHARD1_FID, "test", None, None); + let timestamp = original_cast.data.as_ref().unwrap().timestamp; + let reply_shard1 = messages_factory::casts::create_cast_with_parent( + SHARD1_FID, + "reply shard1", + SHARD1_FID, + &original_cast.hash, + Some(timestamp + 1), + None, + ); + let reply_shard2 = messages_factory::casts::create_cast_with_parent( + SHARD2_FID, + "reply shard2", + SHARD1_FID, + &original_cast.hash, + Some(timestamp + 2), + None, + ); + test_helper::commit_message(&mut engine1, &original_cast).await; + test_helper::commit_message(&mut engine1, &reply_shard1).await; + test_helper::commit_message(&mut engine2, &reply_shard2).await; + + // page_size large enough to drain both shards in one call. + let response = service + .get_casts_by_parent(Request::new(proto::CastsByParentRequest { + parent: Some(proto::casts_by_parent_request::Parent::ParentCastId( + proto::CastId { + fid: SHARD1_FID, + hash: original_cast.hash.clone(), + }, + )), + page_size: Some(10), + page_token: None, + reverse: None, + })) + .await + .unwrap(); + test_helper::assert_contains_all_messages(&response, &[&reply_shard1, &reply_shard2]); + assert!( + response.get_ref().next_page_token.is_none(), + "next_page_token should be None when all shards are exhausted, got {:?}", + response.get_ref().next_page_token, + ); + } + + #[tokio::test] + async fn test_get_events_returns_none_token_when_exhausted() { + // Same regression as test_get_casts_by_parent_returns_none_token_when_exhausted + // but exercises the get_events code path so we cover a second call site + // of the shared helper. + let ( + stores, + _senders, + _engines, + _block_engine, + service, + _shard_decision_tx, + _block_decision_tx, + ) = make_server(None).await; + + write_events_to_db(stores.get(&1u32).unwrap().shard_store.db.clone(), 3).await; + + let response = service + .get_events(Request::new(proto::EventsRequest { + start_id: 0, + shard_index: None, + stop_id: None, + page_size: Some(100), + page_token: None, + reverse: None, + })) + .await + .unwrap(); + let response = response.get_ref(); + assert_eq!(response.events.len(), 3); + assert!( + response.next_page_token.is_none(), + "next_page_token should be None when all shards are exhausted, got {:?}", + response.next_page_token, + ); + } + #[tokio::test] async fn test_storage_limits() { // Works with no storage