From d0c190153453c4ffcf7865bdd1f20ecc7756d118 Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Wed, 1 Jul 2026 15:39:41 +0200 Subject: [PATCH] fix(collator): fix validation for already skipped blocks --- .../src/validator/impls/std_impl/session.rs | 83 ++++++++++++------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/collator/src/validator/impls/std_impl/session.rs b/collator/src/validator/impls/std_impl/session.rs index 72edd41a0..36009fd04 100644 --- a/collator/src/validator/impls/std_impl/session.rs +++ b/collator/src/validator/impls/std_impl/session.rs @@ -11,7 +11,7 @@ use backon::BackoffBuilder; use futures_util::stream::FuturesUnordered; use futures_util::{Future, StreamExt}; use scc::TreeIndex; -use tokio::sync::{Notify, Semaphore}; +use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; use tracing::Instrument; use tycho_crypto::ed25519::KeyPair; @@ -100,8 +100,7 @@ impl ValidatorSession { validators: Arc::new(validators), block_signatures: TreeIndex::new(), cached_signatures: TreeIndex::new(), - cancelled: AtomicBool::new(false), - cancelled_signal: Notify::new(), + cancelled: CancellationToken::new(), events_scope, }); @@ -139,6 +138,7 @@ impl ValidatorSession { own_validator_idx, state, min_seqno: AtomicU32::new(info.start_block_seqno), + cancelled_until: AtomicU32::new(0), }), }; @@ -160,12 +160,17 @@ impl ValidatorSession { } pub fn cancel(&self) { + self.inner + .cancelled_until + .store(u32::MAX, Ordering::Release); self.inner.state.events_scope.finish(); - self.inner.state.cancelled.store(true, Ordering::Release); - self.inner.state.cancelled_signal.notify_waiters(); + self.inner.state.cancelled.cancel(); } pub fn cancel_until(&self, block_seqno: u32) { + self.inner + .cancelled_until + .fetch_max(block_seqno, Ordering::Release); self.inner .min_seqno .fetch_max(block_seqno, Ordering::Release); @@ -203,6 +208,16 @@ impl ValidatorSession { }, ); + // Fast path check that block is already cancelled. + if block_id.seqno <= self.inner.cancelled_until.load(Ordering::Acquire) { + tracing::info!( + target: tracing_targets::VALIDATOR, + block_seqno = block_id.seqno, + "block cancelled before start", + ); + return Ok(ValidationStatus::Skipped); + } + self.inner .min_seqno .fetch_max(block_id.seqno, Ordering::Release); @@ -222,7 +237,11 @@ impl ValidatorSession { Some(cached) => self.reuse_signatures(block_id, &events_scope, cached).await, None => self.prepare_new_signatures(block_id, &events_scope), } - .build(block_id, state.weight_threshold); + .build( + state.cancelled.child_token(), + block_id, + state.weight_threshold, + ); // Allow only one validation at a time if state @@ -239,6 +258,20 @@ impl ValidatorSession { ); } + // Check once more after the block is registered in `block_signatures`. + if block_id.seqno <= self.inner.cancelled_until.load(Ordering::Acquire) + || block_signatures.cancelled.is_cancelled() + { + tracing::info!( + target: tracing_targets::VALIDATOR, + block_seqno = block_id.seqno, + "block cancelled before start", + ); + block_signatures.cancelled.cancel(); + state.block_signatures.remove(&block_id.seqno); + return Ok(ValidationStatus::Skipped); + } + // NOTE: To eliminate the gap inside exchange routine, we can remove cached signatures // only after we have inserted the block. // @@ -288,16 +321,8 @@ impl ValidatorSession { futures.push(JoinTask::new(fut.instrument(tracing::Span::current()))); } - let mut session_cancelled = pin!(state.cancelled_signal.notified()); - if state.cancelled.load(Ordering::Acquire) { - tracing::trace!( - target: tracing_targets::VALIDATOR, - block_seqno = block_id.seqno, - "session cancelled", - ); - return Ok(ValidationStatus::Skipped); - } - + // NOTE: This cancellation token is a child of session token so it is cancelled + // when either the block or the session are cancelled. let mut block_cancelled = pin!(block_signatures.cancelled.cancelled()); while total_weight < state.weight_threshold { let res = tokio::select! { @@ -305,16 +330,8 @@ impl ValidatorSession { Some(res) => res, None => anyhow::bail!("no more signatures to collect but the threshold is not reached"), }, - _ = &mut session_cancelled => { - tracing::trace!( - target: tracing_targets::VALIDATOR, - block_seqno = block_id.seqno, - "session cancelled", - ); - return Ok(ValidationStatus::Skipped) - }, _ = &mut block_cancelled => { - tracing::trace!( + tracing::info!( target: tracing_targets::VALIDATOR, block_seqno = block_id.seqno, "block cancelled", @@ -497,6 +514,7 @@ impl fmt::Debug for DebugLogValidatorSesssion<'_> { .field("weight_threshold", &self.0.inner.state.weight_threshold) .field("start_block_seqno", &self.0.inner.start_block_seqno) .field("min_seqno", &self.0.inner.min_seqno) + .field("cancelled_until", &self.0.inner.cancelled_until) .field("validators", &self.0.inner.state.validators) .finish() } @@ -513,6 +531,7 @@ struct Inner { own_weight: u64, state: Arc, min_seqno: AtomicU32, + cancelled_until: AtomicU32, } impl Inner { @@ -659,8 +678,7 @@ struct SessionState { validators: Arc>, block_signatures: TreeIndex>, cached_signatures: TreeIndex>, - cancelled: AtomicBool, - cancelled_signal: Notify, + cancelled: CancellationToken, events_scope: ValidatorSessionScope, } @@ -761,7 +779,12 @@ struct BlockSignaturesBuilder { } impl BlockSignaturesBuilder { - fn build(self, block_id: &BlockId, weight_threshold: u64) -> Arc { + fn build( + self, + cancelled: CancellationToken, + block_id: &BlockId, + weight_threshold: u64, + ) -> Arc { metrics::gauge!(METRIC_BLOCK_SLOTS).increment(1); Arc::new(BlockSignatures { @@ -770,7 +793,7 @@ impl BlockSignaturesBuilder { other_signatures: self.other_signatures, total_weight: AtomicU64::new(self.total_weight), validated: AtomicBool::new(self.total_weight >= weight_threshold), - cancelled: CancellationToken::new(), + cancelled, events_scope: self.events_scope, }) } @@ -828,7 +851,7 @@ impl ExchangeSignatures for SessionState { block_seqno: u32, signature: Arc<[u8; 64]>, ) -> Result { - if self.cancelled.load(Ordering::Acquire) { + if self.cancelled.is_cancelled() { return Err(ValidationError::Cancelled); }