diff --git a/src/connectors/onchain_events/mod.rs b/src/connectors/onchain_events/mod.rs index 4dd8de54..7036acc4 100644 --- a/src/connectors/onchain_events/mod.rs +++ b/src/connectors/onchain_events/mod.rs @@ -92,6 +92,9 @@ const RETRY_TIMEOUT_SECONDS: u64 = 10; const BASE_BLOCK_PAGE_SIZE: u64 = 8000; // Alchemy max is 10K +// Number of blocks to go back from the latest block when starting live sync by default +const LIVE_SYNC_BLOCK_OFFSET: u64 = 500; + #[derive(Debug, Serialize, Deserialize)] pub struct Config { pub rpc_url: String, @@ -490,6 +493,26 @@ impl Subscriber { } } + // Picks where to start live-sync when no explicit start_block is configured. + // + // - If the DB already has a checkpoint, resume there: jumping forward would + // silently skip onchain events between the checkpoint and the chain tip. + // - Otherwise (fresh node), start at `tip - LIVE_SYNC_BLOCK_OFFSET`, floored + // at `first_block` so we never query before the contracts existed. + fn live_sync_start_block( + latest_block_on_chain: u64, + latest_block_in_db: u64, + first_block: u64, + ) -> u64 { + if latest_block_in_db > 0 { + latest_block_in_db + } else { + latest_block_on_chain + .saturating_sub(LIVE_SYNC_BLOCK_OFFSET) + .max(first_block) + } + } + fn chain_id(chain: node_local_state::Chain) -> u32 { match chain { node_local_state::Chain::Optimism => OP_MAINNET_CHAIN_ID, @@ -1267,8 +1290,11 @@ impl Subscriber { let live_sync_block; match self.start_block_number { None => { - // By default, start from the first block or the latest block in the db. Whichever is higher - live_sync_block = Some(Self::first_block(self.chain).max(latest_block_in_db)); + live_sync_block = Some(Self::live_sync_start_block( + latest_block_on_chain, + latest_block_in_db, + Self::first_block(self.chain), + )); } Some(start_block_number) => { let historical_sync_start_block = latest_block_in_db.max(start_block_number); @@ -1392,4 +1418,51 @@ mod tests { assert_eq!(result, Some(9152)); } + + mod live_sync_start_block { + use super::*; + + const FIRST_BLOCK: u64 = 100_000_000; + const TIP: u64 = 110_000_000; + + #[test] + fn empty_db_starts_at_tip_minus_offset() { + let start = Subscriber::live_sync_start_block(TIP, 0, FIRST_BLOCK); + assert_eq!(start, TIP - LIVE_SYNC_BLOCK_OFFSET); + } + + #[test] + fn empty_db_clamps_to_first_block_when_tip_is_close_to_genesis() { + // Tip is below first_block + offset, so the offset would underflow past + // contract deployment. Must clamp to first_block. + let tip = FIRST_BLOCK + 100; + let start = Subscriber::live_sync_start_block(tip, 0, FIRST_BLOCK); + assert_eq!(start, FIRST_BLOCK); + } + + #[test] + fn empty_db_clamps_to_first_block_when_tip_is_below_offset() { + // Pathological: tip < LIVE_SYNC_BLOCK_OFFSET. saturating_sub returns 0, + // which must still be clamped up to first_block. + let start = Subscriber::live_sync_start_block(50, 0, FIRST_BLOCK); + assert_eq!(start, FIRST_BLOCK); + } + + #[test] + fn db_ahead_of_default_resumes_from_db() { + let db = TIP - 10; + let start = Subscriber::live_sync_start_block(TIP, db, FIRST_BLOCK); + assert_eq!(start, db); + } + + #[test] + fn db_behind_default_still_resumes_from_db() { + // Regression guard: if the DB checkpoint is far behind tip - offset + // (long downtime, stale snapshot), we MUST resume at the checkpoint — + // jumping to tip - offset would silently skip onchain events in the gap. + let db = TIP - LIVE_SYNC_BLOCK_OFFSET - 10_000; + let start = Subscriber::live_sync_start_block(TIP, db, FIRST_BLOCK); + assert_eq!(start, db); + } + } }