Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions src/connectors/onchain_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const RETRY_TIMEOUT_SECONDS: u64 = 10;

const BASE_BLOCK_PAGE_SIZE: u64 = 8000; // Alchemy max is 10K

// Number of blocks to go back from the latest block when starting live sync by default
const LIVE_SYNC_BLOCK_OFFSET: u64 = 500;

#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
pub rpc_url: String,
Expand Down Expand Up @@ -490,6 +493,26 @@ impl Subscriber {
}
}

// Picks where to start live-sync when no explicit start_block is configured.
//
// - If the DB already has a checkpoint, resume there: jumping forward would
// silently skip onchain events between the checkpoint and the chain tip.
// - Otherwise (fresh node), start at `tip - LIVE_SYNC_BLOCK_OFFSET`, floored
// at `first_block` so we never query before the contracts existed.
fn live_sync_start_block(
latest_block_on_chain: u64,
latest_block_in_db: u64,
first_block: u64,
) -> u64 {
if latest_block_in_db > 0 {
latest_block_in_db
} else {
latest_block_on_chain
.saturating_sub(LIVE_SYNC_BLOCK_OFFSET)
.max(first_block)
}
}

fn chain_id(chain: node_local_state::Chain) -> u32 {
match chain {
node_local_state::Chain::Optimism => OP_MAINNET_CHAIN_ID,
Expand Down Expand Up @@ -1267,8 +1290,11 @@ impl Subscriber {
let live_sync_block;
match self.start_block_number {
None => {
// By default, start from the first block or the latest block in the db. Whichever is higher
live_sync_block = Some(Self::first_block(self.chain).max(latest_block_in_db));
live_sync_block = Some(Self::live_sync_start_block(
latest_block_on_chain,
latest_block_in_db,
Self::first_block(self.chain),
));
}
Some(start_block_number) => {
let historical_sync_start_block = latest_block_in_db.max(start_block_number);
Expand Down Expand Up @@ -1392,4 +1418,51 @@ mod tests {

assert_eq!(result, Some(9152));
}

mod live_sync_start_block {
use super::*;

const FIRST_BLOCK: u64 = 100_000_000;
const TIP: u64 = 110_000_000;

#[test]
fn empty_db_starts_at_tip_minus_offset() {
let start = Subscriber::live_sync_start_block(TIP, 0, FIRST_BLOCK);
assert_eq!(start, TIP - LIVE_SYNC_BLOCK_OFFSET);
}

#[test]
fn empty_db_clamps_to_first_block_when_tip_is_close_to_genesis() {
// Tip is below first_block + offset, so the offset would underflow past
// contract deployment. Must clamp to first_block.
let tip = FIRST_BLOCK + 100;
let start = Subscriber::live_sync_start_block(tip, 0, FIRST_BLOCK);
assert_eq!(start, FIRST_BLOCK);
}

#[test]
fn empty_db_clamps_to_first_block_when_tip_is_below_offset() {
// Pathological: tip < LIVE_SYNC_BLOCK_OFFSET. saturating_sub returns 0,
// which must still be clamped up to first_block.
let start = Subscriber::live_sync_start_block(50, 0, FIRST_BLOCK);
assert_eq!(start, FIRST_BLOCK);
}

#[test]
fn db_ahead_of_default_resumes_from_db() {
let db = TIP - 10;
let start = Subscriber::live_sync_start_block(TIP, db, FIRST_BLOCK);
assert_eq!(start, db);
}

#[test]
fn db_behind_default_still_resumes_from_db() {
// Regression guard: if the DB checkpoint is far behind tip - offset
// (long downtime, stale snapshot), we MUST resume at the checkpoint —
// jumping to tip - offset would silently skip onchain events in the gap.
let db = TIP - LIVE_SYNC_BLOCK_OFFSET - 10_000;
let start = Subscriber::live_sync_start_block(TIP, db, FIRST_BLOCK);
assert_eq!(start, db);
}
}
}
Loading