Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 53 additions & 30 deletions collator/src/validator/impls/std_impl/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use backon::BackoffBuilder;
use futures_util::stream::FuturesUnordered;
use futures_util::{Future, StreamExt};
use scc::TreeIndex;
use tokio::sync::{Notify, Semaphore};
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tycho_crypto::ed25519::KeyPair;
Expand Down Expand Up @@ -100,8 +100,7 @@ impl ValidatorSession {
validators: Arc::new(validators),
block_signatures: TreeIndex::new(),
cached_signatures: TreeIndex::new(),
cancelled: AtomicBool::new(false),
cancelled_signal: Notify::new(),
cancelled: CancellationToken::new(),
events_scope,
});

Expand Down Expand Up @@ -139,6 +138,7 @@ impl ValidatorSession {
own_validator_idx,
state,
min_seqno: AtomicU32::new(info.start_block_seqno),
cancelled_until: AtomicU32::new(0),
}),
};

Expand All @@ -160,12 +160,17 @@ impl ValidatorSession {
}

pub fn cancel(&self) {
self.inner
.cancelled_until
.store(u32::MAX, Ordering::Release);
self.inner.state.events_scope.finish();
self.inner.state.cancelled.store(true, Ordering::Release);
self.inner.state.cancelled_signal.notify_waiters();
self.inner.state.cancelled.cancel();
}

pub fn cancel_until(&self, block_seqno: u32) {
self.inner
.cancelled_until
.fetch_max(block_seqno, Ordering::Release);
self.inner
.min_seqno
.fetch_max(block_seqno, Ordering::Release);
Expand Down Expand Up @@ -203,6 +208,16 @@ impl ValidatorSession {
},
);

// Fast path check that block is already cancelled.
if block_id.seqno <= self.inner.cancelled_until.load(Ordering::Acquire) {
tracing::info!(
target: tracing_targets::VALIDATOR,
block_seqno = block_id.seqno,
"block cancelled before start",
);
return Ok(ValidationStatus::Skipped);
}

self.inner
.min_seqno
.fetch_max(block_id.seqno, Ordering::Release);
Expand All @@ -222,7 +237,11 @@ impl ValidatorSession {
Some(cached) => self.reuse_signatures(block_id, &events_scope, cached).await,
None => self.prepare_new_signatures(block_id, &events_scope),
}
.build(block_id, state.weight_threshold);
.build(
state.cancelled.child_token(),
block_id,
state.weight_threshold,
);

// Allow only one validation at a time
if state
Expand All @@ -239,6 +258,20 @@ impl ValidatorSession {
);
}

// Check once more after the block is registered in `block_signatures`.
if block_id.seqno <= self.inner.cancelled_until.load(Ordering::Acquire)
|| block_signatures.cancelled.is_cancelled()
{
tracing::info!(
target: tracing_targets::VALIDATOR,
block_seqno = block_id.seqno,
"block cancelled before start",
);
block_signatures.cancelled.cancel();
state.block_signatures.remove(&block_id.seqno);
return Ok(ValidationStatus::Skipped);
}

// NOTE: To eliminate the gap inside exchange routine, we can remove cached signatures
// only after we have inserted the block.
//
Expand Down Expand Up @@ -288,33 +321,17 @@ impl ValidatorSession {
futures.push(JoinTask::new(fut.instrument(tracing::Span::current())));
}

let mut session_cancelled = pin!(state.cancelled_signal.notified());
if state.cancelled.load(Ordering::Acquire) {
tracing::trace!(
target: tracing_targets::VALIDATOR,
block_seqno = block_id.seqno,
"session cancelled",
);
return Ok(ValidationStatus::Skipped);
}

// NOTE: This cancellation token is a child of session token so it is cancelled
// when either the block or the session are cancelled.
let mut block_cancelled = pin!(block_signatures.cancelled.cancelled());
while total_weight < state.weight_threshold {
let res = tokio::select! {
res = futures.next() => match res {
Some(res) => res,
None => anyhow::bail!("no more signatures to collect but the threshold is not reached"),
},
_ = &mut session_cancelled => {
tracing::trace!(
target: tracing_targets::VALIDATOR,
block_seqno = block_id.seqno,
"session cancelled",
);
return Ok(ValidationStatus::Skipped)
},
_ = &mut block_cancelled => {
tracing::trace!(
tracing::info!(
target: tracing_targets::VALIDATOR,
block_seqno = block_id.seqno,
"block cancelled",
Expand Down Expand Up @@ -497,6 +514,7 @@ impl fmt::Debug for DebugLogValidatorSesssion<'_> {
.field("weight_threshold", &self.0.inner.state.weight_threshold)
.field("start_block_seqno", &self.0.inner.start_block_seqno)
.field("min_seqno", &self.0.inner.min_seqno)
.field("cancelled_until", &self.0.inner.cancelled_until)
.field("validators", &self.0.inner.state.validators)
.finish()
}
Expand All @@ -513,6 +531,7 @@ struct Inner {
own_weight: u64,
state: Arc<SessionState>,
min_seqno: AtomicU32,
cancelled_until: AtomicU32,
}

impl Inner {
Expand Down Expand Up @@ -659,8 +678,7 @@ struct SessionState {
validators: Arc<FastHashMap<PeerId, BriefValidatorDescr>>,
block_signatures: TreeIndex<u32, Arc<BlockSignatures>>,
cached_signatures: TreeIndex<u32, Arc<CachedSignatures>>,
cancelled: AtomicBool,
cancelled_signal: Notify,
cancelled: CancellationToken,
events_scope: ValidatorSessionScope,
}

Expand Down Expand Up @@ -761,7 +779,12 @@ struct BlockSignaturesBuilder {
}

impl BlockSignaturesBuilder {
fn build(self, block_id: &BlockId, weight_threshold: u64) -> Arc<BlockSignatures> {
fn build(
self,
cancelled: CancellationToken,
block_id: &BlockId,
weight_threshold: u64,
) -> Arc<BlockSignatures> {
metrics::gauge!(METRIC_BLOCK_SLOTS).increment(1);

Arc::new(BlockSignatures {
Expand All @@ -770,7 +793,7 @@ impl BlockSignaturesBuilder {
other_signatures: self.other_signatures,
total_weight: AtomicU64::new(self.total_weight),
validated: AtomicBool::new(self.total_weight >= weight_threshold),
cancelled: CancellationToken::new(),
cancelled,
events_scope: self.events_scope,
})
}
Expand Down Expand Up @@ -828,7 +851,7 @@ impl ExchangeSignatures for SessionState {
block_seqno: u32,
signature: Arc<[u8; 64]>,
) -> Result<proto::Exchange, Self::Err> {
if self.cancelled.load(Ordering::Acquire) {
if self.cancelled.is_cancelled() {
return Err(ValidationError::Cancelled);
}

Expand Down
Loading