diff --git a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Block.hs b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Block.hs index 99af2ee481..93b1201354 100644 --- a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Block.hs +++ b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Block.hs @@ -215,7 +215,9 @@ import Ouroboros.Consensus.Protocol.Praos (Praos, PraosCrypto) import Ouroboros.Consensus.Protocol.TPraos (TPraos) import Ouroboros.Consensus.Shelley.Eras import Ouroboros.Consensus.Shelley.Ledger (ShelleyBlock, ShelleyCompatible) -import Ouroboros.Consensus.Storage.LedgerDB (ResolveLeiosBlock (resolveLeiosBlock)) +import Ouroboros.Consensus.Storage.LedgerDB + ( ResolveLeiosBlock (certifiedEbFromHeader, resolveLeiosBlock) + ) import Ouroboros.Consensus.TypeFamilyWrappers {------------------------------------------------------------------------------- @@ -1403,3 +1405,7 @@ instance injectConwayBlock <$> resolveLeiosBlock db conwayHdrSt conwayBlk resolveLeiosBlock _ _ blk = return blk + + certifiedEbFromHeader (HeaderConway conwayHdr) = + certifiedEbFromHeader conwayHdr + certifiedEbFromHeader _ = Nothing diff --git a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs index 0fc2e95a4e..630e03656d 100644 --- a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs +++ b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs @@ -64,6 +64,7 @@ import Control.Exception (assert) import qualified Data.ByteString.Short as Short import Data.Functor.These (These1 (..)) import qualified Data.Map.Strict as Map +import Data.Maybe (listToMaybe) import Data.SOP.BasicFunctors import Data.SOP.Counting import Data.SOP.Functors (Flip (..)) @@ -814,11 +815,12 @@ protocolInfoCardano paramsCardano (Shelley.ShelleyStorageConfig tpraosSlotsPerKESPeriod k) , topLevelConfigCheckpoints = cardanoCheckpoints , -- FIXME: REMOVE THIS. Accesses and re-uses KES signing key material. + -- For nodes without Shelley-based leader credentials (e.g. relays), + -- there is no key material to re-use, so the voting key is Nothing. topLevelConfigVotingKey = - Just - . rawSerialiseUnsoundPureSignKeyKES + rawSerialiseUnsoundPureSignKeyKES . shelleyLeaderCredentialsInitSignKey - $ credssShelleyBased !! 0 + <$> listToMaybe credssShelleyBased } -- When the initial ledger state is not in the Byron era, register various diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Forge.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Forge.hs index d4699e2d90..a6d5f68494 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Forge.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Forge.hs @@ -94,6 +94,7 @@ forgeShelleyBlock hotKey cbl mayLeiosInfo ForgeBlockArgs{..} = do (SL.bodyBytesSize protocolVersion body) protocolVersion (snd <$> mayForgedEbAnn) + Nothing let blk = mkShelleyBlock $ @@ -116,6 +117,7 @@ forgeShelleyBlock hotKey cbl mayLeiosInfo ForgeBlockArgs{..} = do (SL.bodyBytesSize protocolVersion body) protocolVersion Nothing -- FIXME(bladyjoker): Skip announcement when certifying https://github.com/input-output-hk/ouroboros-leios/issues/838 + (Just $ Leios.leiosCertificateEbPoint cert) let blk = mkShelleyBlock $ SL.Block diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Ledger.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Ledger.hs index 0f61f331d5..ab2ea25fde 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Ledger.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Ledger.hs @@ -149,6 +149,10 @@ import Ouroboros.Consensus.Protocol.Praos , PraosState (praosStateLastSlot, praosStateLeios) , withOriginToSlotNo ) +import Ouroboros.Consensus.Protocol.Praos.Header + ( Header (headerBody) + , HeaderBody (hbMayCertifiedEb) + ) import Ouroboros.Consensus.Protocol.TPraos (TPraos) import Ouroboros.Consensus.Shelley.Ledger.Block import Ouroboros.Consensus.Shelley.Ledger.Config @@ -158,6 +162,7 @@ import Ouroboros.Consensus.Shelley.Protocol.Abstract , envelopeChecks , mkHeaderView ) +import Ouroboros.Consensus.Shelley.Protocol.Praos () import Ouroboros.Consensus.Storage.LedgerDB import Ouroboros.Consensus.Util.CBOR ( decodeWithOrigin @@ -372,6 +377,9 @@ instance } resolveLeiosBlock _leiosDb _hdrSt blk = return blk + certifiedEbFromHeader (ShelleyHeader rawHdr _) = + hbMayCertifiedEb (headerBody rawHdr) + deserialiseShelleyTx :: forall era. ShelleyBasedEra era => BS.ByteString -> Core.Tx era deserialiseShelleyTx bs = case CB.decodeFullAnnotator (Core.eraProtVerLow @era) "Leios Tx" CB.decCBOR (BL.fromStrict bs) of Left err -> error $ "Failed to deserialise Tx era: " <> show err diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Abstract.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Abstract.hs index 4395a1d7b9..08a548b61a 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Abstract.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Abstract.hs @@ -48,7 +48,7 @@ import Data.Kind (Type) import Data.Typeable (Typeable) import Data.Word (Word64) import GHC.Generics (Generic) -import LeiosDemoTypes (EbAnnouncement) +import LeiosDemoTypes (EbAnnouncement, LeiosPoint) import NoThunks.Class (NoThunks) import Numeric.Natural (Natural) import Ouroboros.Consensus.Protocol.Abstract @@ -165,6 +165,8 @@ class ProtocolHeaderSupportsKES proto where ProtVer -> -- | Leios EB announcement Maybe EbAnnouncement -> + -- | Leios certified EB + Maybe LeiosPoint -> m (ShelleyProtocolHeader proto) -- | ProtocolHeaderSupportsProtocol` provides support for the concrete diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Praos.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Praos.hs index c04c7526c4..2a8306834b 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Praos.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Praos.hs @@ -144,7 +144,7 @@ instance PraosCrypto c => ProtocolHeaderSupportsKES (Praos c) where currentKesPeriod - startOfKesPeriod | otherwise = 0 - mkHeader hk cbl il slotNo blockNo prevHash bodyHash bodySize protVer mayEbAnnouncement = do + mkHeader hk cbl il slotNo blockNo prevHash bodyHash bodySize protVer mayEbAnnouncement mayCertifiedEb = do PraosFields{praosSignature, praosToSign} <- forgePraosFields hk cbl il mkBhBodyBytes pure $ Header praosToSign praosSignature where @@ -167,6 +167,7 @@ instance PraosCrypto c => ProtocolHeaderSupportsKES (Praos c) where , hbOCert = praosToSignOCert , hbProtVer = protVer , hbMayEbAnnouncement = mayEbAnnouncement + , hbMayCertifiedEb = mayCertifiedEb } instance PraosCrypto c => ProtocolHeaderSupportsProtocol (Praos c) where @@ -182,6 +183,7 @@ instance PraosCrypto c => ProtocolHeaderSupportsProtocol (Praos c) where , hvSigned = headerBody , hvSignature = headerSig , hvMayEbAnnouncement = hbMayEbAnnouncement headerBody + , hvMayCertifiedEb = hbMayCertifiedEb headerBody } pHeaderIssuer = hbVk . headerBody pHeaderIssueNo = SL.ocertN . hbOCert . headerBody diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/TPraos.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/TPraos.hs index e86cd9188e..319eb12ffa 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/TPraos.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/TPraos.hs @@ -90,7 +90,7 @@ instance PraosCrypto c => ProtocolHeaderSupportsKES (TPraos c) where currentKesPeriod - startOfKesPeriod | otherwise = 0 - mkHeader hotKey canBeLeader isLeader curSlot curNo prevHash bbHash actualBodySize protVer _mayEbAnnouncement = do + mkHeader hotKey canBeLeader isLeader curSlot curNo prevHash bbHash actualBodySize protVer _mayEbAnnouncement _mayCertifiedEb = do TPraosFields{tpraosSignature, tpraosToSign} <- forgeTPraosFields hotKey canBeLeader isLeader mkBhBody pure $ SL.BHeader tpraosToSign tpraosSignature diff --git a/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Examples.hs b/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Examples.hs index 5eb9c64e62..7d76583e8e 100644 --- a/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Examples.hs +++ b/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Examples.hs @@ -283,6 +283,7 @@ fromShelleyLedgerExamplesPraos , hbOCert = SL.bheaderOCert bhBody , hbProtVer = SL.bprotver bhBody , hbMayEbAnnouncement = Nothing + , hbMayCertifiedEb = Nothing } hSig = coerce bhSig hash = ShelleyHash $ SL.unHashHeader sleHashHeader diff --git a/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Generators.hs b/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Generators.hs index 353a17af0d..45f0cd676e 100644 --- a/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Generators.hs +++ b/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Generators.hs @@ -145,6 +145,7 @@ instance , Praos.hbOCert = SL.bheaderOCert bhBody , Praos.hbProtVer = SL.bprotver bhBody , Praos.hbMayEbAnnouncement = Nothing + , Praos.hbMayCertifiedEb = Nothing } hSig = coerce bhSig diff --git a/ouroboros-consensus-cardano/test/cardano-test/Test/ThreadNet/Leios.hs b/ouroboros-consensus-cardano/test/cardano-test/Test/ThreadNet/Leios.hs index f6a9b5ff20..04968ace32 100644 --- a/ouroboros-consensus-cardano/test/cardano-test/Test/ThreadNet/Leios.hs +++ b/ouroboros-consensus-cardano/test/cardano-test/Test/ThreadNet/Leios.hs @@ -100,8 +100,10 @@ import Test.Consensus.Cardano.ProtocolInfo (Era (Conway), hardForkInto) import Test.QuickCheck ( Property , Testable + , choose , conjoin , counterexample + , forAll , tabulate , (.&&.) , (.||.) @@ -137,7 +139,7 @@ import Test.ThreadNet.Network , TraceThreadNetNode (FromLeios, FromLeiosPeer, FromMempool) ) import Test.ThreadNet.TxGen.Cardano (CardanoTxGenExtra (..)) -import Test.ThreadNet.Util.NodeJoinPlan (trivialNodeJoinPlan) +import Test.ThreadNet.Util.NodeJoinPlan (NodeJoinPlan (..), trivialNodeJoinPlan) import Test.ThreadNet.Util.NodeRestarts (noRestarts) import Test.ThreadNet.Util.NodeToNodeVersion (newestVersion) import Test.ThreadNet.Util.NodeTopology (meshNodeTopology) @@ -152,6 +154,8 @@ tests = "Leios ThreadNet" [ adjustQuickCheckTests (`div` 10) $ testProperty "basic functionality" prop_leios + , adjustQuickCheckTests (`div` 10) $ + testProperty "late join" prop_leios_late_join ] -- | Verify a suite of basic Leios ThreadNet invariants in a single run: @@ -183,7 +187,9 @@ prop_leios seed = numSlots = 200 :: Word64 (testOutput, ProtocolInfo{pInfoConfig, pInfoInitLedger}) = - runThreadNet seed (NumSlots numSlots) (NumCoreNodes $ fromIntegral numNodes) + runThreadNet seed (NumSlots numSlots) numCoreNodes (trivialNodeJoinPlan numCoreNodes) + + numCoreNodes = NumCoreNodes $ fromIntegral numNodes traces = testOutput.allTraces @@ -334,6 +340,53 @@ prop_leios seed = | (s1, s2) <- zip certifyingBlocks (drop 1 certifyingBlocks) ] +-- | Late-joining node must not crash. +-- +-- 4 nodes, 200 slots. Nodes 0โ€“2 join at slot 0; node 3 joins at a random +-- slot. On the current codebase this crashes in 'resolveLeiosBlock' because +-- the late node receives a CertRB referencing an EB it never saw. +prop_leios_late_join :: Seed -> Property +prop_leios_late_join seed = + -- Cap the join slot at numSlots/4 so the late node always has at least 3/4 + -- of the run to catch up. Otherwise samples near numSlots would fail the + -- chain-consistency assertion for catch-up-bandwidth reasons unrelated to + -- the late-join logic under test. + forAll (choose (1, fromIntegral numSlots `div` 4)) $ \lateJoinSlot -> + let + joinPlan = + NodeJoinPlan $ + Map.fromList + [ (CoreNodeId 0, SlotNo 0) + , (CoreNodeId 1, SlotNo 0) + , (CoreNodeId 2, SlotNo 0) + , (CoreNodeId 3, SlotNo $ fromIntegral (lateJoinSlot :: Int)) + ] + + numCoreNodes = NumCoreNodes 4 + + (testOutput, _) = + runThreadNet seed (NumSlots numSlots) numCoreNodes joinPlan + + nodeChains = + Chain.toOldestFirst . nodeOutputFinalChain <$> testOutput.testOutputNodes + in + -- The simulation must not throw: ChainSel skips a CertRB whose EB + -- closure is missing instead of crashing in resolveLeiosBlock. + conjoin + [ not (null nodeChains) + & counterexample "test output was empty" + , -- All nodes should converge to the same chain. Without the + -- ChainSel re-trigger on EB closure arrival ('ebCompletionRunner'), + -- the late node's chain stays shorter because the filtered + -- CertRBs are never reconsidered. + all (== head (Map.elems nodeChains)) nodeChains + & counterexample "nodes have different chains" + & counterexample ("chain lengths: " <> show (fmap length nodeChains)) + ] + & counterexample ("late join slot: " <> show lateJoinSlot) + where + numSlots = 200 :: Word64 + -- | Independently compute cumulative tx bytes by resolving each block in the -- chain (filling in EB closures from the LeiosDB) and summing individual -- 'sizeTxF' values per transaction. @@ -420,8 +473,9 @@ runThreadNet :: Seed -> NumSlots -> NumCoreNodes -> + NodeJoinPlan -> (TestOutput (CardanoBlock StandardCrypto), ProtocolInfo (CardanoBlock StandardCrypto)) -runThreadNet initSeed numSlots numCoreNodes = +runThreadNet initSeed numSlots numCoreNodes joinPlan = ( runTestNetwork testConfig testConfigB @@ -503,7 +557,7 @@ runThreadNet initSeed numSlots numCoreNodes = { forgeEbbEnv = Nothing , future = EraFinal slotLength shelleyGenesis.sgEpochLength , messageDelay = noCalcMessageDelay - , nodeJoinPlan = trivialNodeJoinPlan numCoreNodes + , nodeJoinPlan = joinPlan , nodeRestarts = noRestarts , txGenExtra = CardanoTxGenExtra diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index e80f057768..e0ce1c78ea 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -165,6 +165,7 @@ import Control.Concurrent.Class.MonadSTM.Strict (readTChan) import qualified Data.ByteString as BS import Data.Map (Map) import qualified Data.Map as Map +import qualified Data.Set as Set import LeiosDemoDb ( LeiosDbConnection , LeiosDbHandle (..) @@ -179,6 +180,7 @@ import LeiosDemoTypes ( ForgedLeiosEb , LeiosOutstanding , LeiosPeerVars + , LeiosPoint (..) , LeiosVote (..) , TraceLeiosKernel (..) , VoterId (..) @@ -419,6 +421,25 @@ initNodeKernel getLeiosOutstanding <- MVar.newMVar Leios.emptyLeiosOutstanding -- TODO init from DB getLeiosReady <- MVar.newEmptyMVar + -- Mirror cdbPendingEBs into Leios missingEbBodies with size 0 so that the + -- LeiosFetch client tries to fetch closures for CertRBs that ChainSel is + -- holding back. + lastAppliedPendingEBs <- newTVarIO Map.empty + void $ + forkLinkedThread registry "NodeKernel.pendingEbReconciler" $ + forever $ do + (added, removed) <- atomically $ do + new <- ChainDB.getPendingCertRBs chainDB + old <- readTVar lastAppliedPendingEBs + when (new == old) retry + writeTVar lastAppliedPendingEBs new + pure (Map.difference new old, Map.difference old new) + MVar.modifyMVar_ getLeiosOutstanding $ + pure + . Leios.applyPendingRemoved (Map.keys removed) + . Leios.applyPendingAdded (Map.keys added) + void $ MVar.tryPutMVar getLeiosReady () + void $ forkLinkedThread registry "NodeKernel.leiosFetchLogic" $ do leiosConn <- allocate_ registry (LeiosDb.open leiosDB) LeiosDb.close @@ -427,8 +448,65 @@ initNodeKernel traceWith tracer $ MkTraceLeiosKernel "leiosFetchLogic: wait for leios ready" () <- MVar.takeMVar getLeiosReady iterationStart <- getMonotonicTime + -- Sweep cdbPendingEBs for CertRBs whose EB closure has since + -- become available locally, and re-enqueue them for ChainSel. + -- ebCompletionRunner already does this when a notification fires, + -- but it can miss the window if the closure completes between + -- ChainSel's "is the closure present?" check and its insert into + -- cdbPendingEBs. The sweep covers that race and any other + -- missed-notification scenarios (e.g. subscription startup gap). + -- + -- An inline recheck in ChainSel.hs (the add-block path that + -- inserts into cdbPendingEBs) closes the immediate race window + -- so callers don't have to wait for the next sweep tick; this + -- sweep is the load-bearing fix and also handles cases the + -- inline recheck can't see (e.g. closure arrival just after a + -- prior pending-insert). + pending <- atomically $ ChainDB.getPendingCertRBs chainDB + forM_ (Map.toList pending) $ \(leiosPoint, certRBHash) -> do + mayComplete <- LeiosDb.leiosDbQueryCompletedEbByPoint leiosConn leiosPoint + when (isJust mayComplete) $ + void $ ChainDB.addReprocessBlock chainDB leiosPoint certRBHash leiosPeersVars <- MVar.readMVar getLeiosPeersVars offerings <- mapM (MVar.readMVar . Leios.offerings) leiosPeersVars + -- Per-peer set of EB hashes the peer has certified on its + -- selected chain. Used as a fallback peer source in + -- 'choosePeerEb' / 'choosePeerTx' when no peer has actively + -- offered the EB body or closure. + -- + -- Load-bearing invariant: if a peer's candidate fragment + -- contains a CertRB, that peer has the EB closure. This rests + -- on the upstream's ChainSync server only emitting + -- selected-chain headers, the upstream's ChainSel filtering + -- CertRBs with missing closures (cdbPendingEBs), and LeiosDB + -- not GC'ing closures. See the fallback branches in + -- 'choosePeerEb' / 'choosePeerTx' in 'LeiosDemoLogic'. + let + -- For one peer, scan its candidate fragment and collect the + -- hashes of every EB that any header on the candidate + -- certifies. Only the EB hash matters for fetch-decision + -- keying; the slot is discarded. + certifiedEbsFromCandidate csHandle = do + state <- readTVar (cschState csHandle) + pure $ + Set.fromList + -- Singleton-list generator with a refutable pattern: + -- each header that does not certify an EB is silently + -- skipped. + [ pointEbHash + | hwt <- AF.toOldestFirst (csCandidate state) + , Just MkLeiosPoint{pointEbHash} <- + [LedgerDB.certifiedEbFromHeader (hwtHeader hwt)] + ] + candidateCertEbs <- atomically $ do + -- Snapshot of all live ChainSync clients (one handle per + -- upstream peer). + handles <- cschcMap varChainSyncHandles + -- Re-key from 'ConnectionId' to Leios's 'PeerId' newtype. + -- 'mapKeysMonotonic' is safe because 'MkPeerId' is just an + -- order-preserving wrapper. + fmap (Map.mapKeysMonotonic Leios.MkPeerId) $ + forM handles certifiedEbsFromCandidate newDecisions <- MVar.modifyMVar getLeiosOutstanding $ \outstanding -> do -- Filter outstanding work against DB before running fetch iteration. -- This removes EBs and TXs we already have (e.g., from forging or other peers). @@ -438,6 +516,7 @@ initNodeKernel Leios.leiosFetchLogicIteration Leios.demoLeiosFetchStaticEnv offerings + candidateCertEbs filteredOutstanding pure (outstanding', newDecisions) traceWith tracer $ MkTraceLeiosKernel $ "leiosFetchLogic: decided" diff --git a/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Header.hs b/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Header.hs index 208dbb5de8..ad78c52171 100644 --- a/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Header.hs +++ b/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Header.hs @@ -5,6 +5,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} -- | Block header associated with Praos. @@ -70,7 +71,7 @@ import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import Data.Word (Word32) import GHC.Generics (Generic) -import LeiosDemoTypes (EbAnnouncement) +import LeiosDemoTypes (EbAnnouncement, LeiosPoint) import NoThunks.Class (AllowThunksIn (..), NoThunks (..)) import Ouroboros.Consensus.Protocol.Praos.VRF (InputVRF) @@ -99,6 +100,10 @@ data HeaderBody crypto = HeaderBody -- ^ protocol version , hbMayEbAnnouncement :: Maybe EbAnnouncement -- ^ Leios EB announcement + , hbMayCertifiedEb :: Maybe LeiosPoint + -- ^ The EB certified by this block's certificate, if any. + -- See CIP-0164 ยง"Ranking Blocks" for how certified EBs + -- link ranking blocks to their EB transaction closures. } deriving Generic @@ -203,13 +208,14 @@ instance Crypto crypto => EncCBOR (HeaderBody crypto) where , hbOCert , hbProtVer , hbMayEbAnnouncement + , hbMayCertifiedEb } = - let (len, encEbAnnouncement) = case hbMayEbAnnouncement of -- TODO(bladyjoker): This shennanigans is here for backwards compatibility, remove it! - Nothing -> (10, mempty) - Just ebAnnouncement -> - ( 11 - , encCBOR ebAnnouncement - ) + -- TODO(bladyjoker): This shennanigans is here for backwards compatibility, remove it! + let (len, encLeiosFields) = case (hbMayEbAnnouncement, hbMayCertifiedEb) of + (Nothing, Nothing) -> (10, mempty) + (Just ebAnn, Nothing) -> (11, encCBOR ebAnn) + (mayEbAnn, Just certEb) -> + (12, encCBOR mayEbAnn <> encCBOR certEb) in mconcat [ encodeListLen len , encCBOR hbBlockNo @@ -222,7 +228,7 @@ instance Crypto crypto => EncCBOR (HeaderBody crypto) where , encCBOR hbBodyHash , encCBOR hbOCert , encCBOR hbProtVer - , encEbAnnouncement + , encLeiosFields ] instance Crypto crypto => DecCBOR (HeaderBody crypto) where @@ -238,9 +244,11 @@ instance Crypto crypto => DecCBOR (HeaderBody crypto) where hbBodyHash <- decCBOR hbOCert <- unCBORGroup <$> decCBOR hbProtVer <- decCBOR - hbMayEbAnnouncement <- case len of - 10 -> return Nothing - 11 -> Just <$> decCBOR @EbAnnouncement -- TODO(bladyjoker): This shennanigans is here for backwards compatibility, remove it! + -- TODO(bladyjoker): This shennanigans is here for backwards compatibility, remove it! + (hbMayEbAnnouncement, hbMayCertifiedEb) <- case len of + 10 -> return (Nothing, Nothing) + 11 -> (,Nothing) . Just <$> decCBOR @EbAnnouncement + 12 -> (,) <$> decCBOR @(Maybe EbAnnouncement) <*> (Just <$> decCBOR @LeiosPoint) _ -> fail $ "Praos HeaderBody CBOR has wrong length: " <> show len return $ HeaderBody @@ -255,6 +263,7 @@ instance Crypto crypto => DecCBOR (HeaderBody crypto) where , hbOCert , hbProtVer , hbMayEbAnnouncement + , hbMayCertifiedEb } encodeHeaderRaw :: diff --git a/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Views.hs b/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Views.hs index cae725449e..a36d2c4e33 100644 --- a/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Views.hs +++ b/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Views.hs @@ -15,7 +15,7 @@ import Cardano.Protocol.TPraos.BHeader (PrevHash) import Cardano.Protocol.TPraos.OCert (OCert) import Cardano.Slotting.Slot (SlotNo) import Data.Word (Word16, Word32) -import LeiosDemoTypes (EbAnnouncement) +import LeiosDemoTypes (EbAnnouncement, LeiosPoint) import Ouroboros.Consensus.Protocol.Praos.Header (HeaderBody) import Ouroboros.Consensus.Protocol.Praos.VRF (InputVRF) @@ -38,6 +38,7 @@ data HeaderView crypto = HeaderView , hvSignature :: !(SignedKES (KES crypto) (HeaderBody crypto)) -- ^ KES Signature of the header , hvMayEbAnnouncement :: !(Maybe EbAnnouncement) + , hvMayCertifiedEb :: !(Maybe LeiosPoint) } data LedgerView = LedgerView diff --git a/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Consensus/Protocol/Serialisation/Generators.hs b/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Consensus/Protocol/Serialisation/Generators.hs index e92e1b2c2c..272942d604 100644 --- a/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Consensus/Protocol/Serialisation/Generators.hs +++ b/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Consensus/Protocol/Serialisation/Generators.hs @@ -18,7 +18,11 @@ import Cardano.Slotting.Slot , WithOrigin (At, Origin) ) import qualified Data.ByteString as BS -import LeiosDemoTypes (EbAnnouncement (EbAnnouncement), EbHash (MkEbHash)) +import LeiosDemoTypes + ( EbAnnouncement (EbAnnouncement) + , EbHash (MkEbHash) + , LeiosPoint (MkLeiosPoint) + ) import Ouroboros.Consensus.Protocol.Praos (LeiosState (LeiosState), PraosState (PraosState)) import qualified Ouroboros.Consensus.Protocol.Praos as Praos import Ouroboros.Consensus.Protocol.Praos.Header @@ -62,6 +66,7 @@ instance Praos.PraosCrypto c => Arbitrary (HeaderBody c) where <*> ocert <*> arbitrary <*> arbitrary + <*> arbitrary instance Praos.PraosCrypto c => Arbitrary (Header c) where arbitrary = do @@ -89,5 +94,8 @@ instance Arbitrary PraosState where instance Arbitrary EbAnnouncement where arbitrary = EbAnnouncement <$> (MkEbHash . BS.pack <$> vector 32) <*> arbitrary +instance Arbitrary LeiosPoint where + arbitrary = MkLeiosPoint <$> arbitrary <*> (MkEbHash . BS.pack <$> vector 32) + instance Arbitrary LeiosState where arbitrary = pure $ LeiosState Nothing 0 diff --git a/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Ouroboros/Consensus/Protocol/Praos/Header.hs b/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Ouroboros/Consensus/Protocol/Praos/Header.hs index f474443a8e..55d3109650 100644 --- a/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Ouroboros/Consensus/Protocol/Praos/Header.hs +++ b/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Ouroboros/Consensus/Protocol/Praos/Header.hs @@ -108,6 +108,7 @@ import Ouroboros.Consensus.Protocol.Praos.VRF , vrfLeaderValue ) import Ouroboros.Consensus.Protocol.TPraos (StandardCrypto) +import Test.Consensus.Protocol.Serialisation.Generators () import Test.QuickCheck ( Gen , arbitrary @@ -445,7 +446,8 @@ genHeaderBody context = do hbBodySize <- choose (1000, 90000) hbBodyHash <- genHash (hbOCert, kesPeriod) <- genCert hbSlotNo context - let hbMayEbAnnouncement = Nothing + hbMayEbAnnouncement <- arbitrary + hbMayCertifiedEb <- arbitrary let hbProtVer = protocolVersionZero headerBody = HeaderBody{..} pure $ (headerBody, kesPeriod) diff --git a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoLogic.hs b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoLogic.hs index bba89278b8..b931cdbff5 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoLogic.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoLogic.hs @@ -262,9 +262,13 @@ leiosFetchLogicIteration :: Ord pid => LeiosFetchStaticEnv -> Map (PeerId pid) (Set EbHash, Set EbHash) -> + -- | Per-peer certified EB hashes inferred from ChainSync candidate + -- fragments. Used as a fallback peer source when no peer has offered the + -- EB body (e.g. for CertRBs whose closure pre-dates the local node). + Map (PeerId pid) (Set EbHash) -> LeiosOutstanding pid -> (LeiosOutstanding pid, LeiosFetchDecisions pid) -leiosFetchLogicIteration env offerings = +leiosFetchLogicIteration env offerings candidateCertEbs = \acc -> go1 acc emptyLeiosFetchDecisions $ expand $ @@ -330,17 +334,31 @@ leiosFetchLogicIteration env offerings = choosePeerEb :: Set (PeerId pid) -> LeiosOutstanding pid -> EbHash -> Maybe (PeerId pid) choosePeerEb peerIds acc ebHash = - foldr (\a _ -> Just a) Nothing $ - [ peerId - | (peerId, (ebHashes, _ebHashes)) <- - Map.toList $ -- TODO prioritize/shuffle? - (`Map.withoutKeys` peerIds) $ -- not already requested from this peer - offerings - , Map.findWithDefault 0 peerId (Leios.requestedBytesSizePerPeer acc) - <= Leios.maxRequestedBytesSizePerPeer env - , -- peer can be sent more requests - ebHash `Set.member` ebHashes -- peer has offered this EB body - ] + case pickFrom (Map.map fst offerings) of + Just peerId -> Just peerId + -- Fallback: peers whose ChainSync candidate fragment contains the + -- CertRB that depends on this EB. + -- + -- Correctness depends on: (1) the upstream's ChainSync server only + -- emits MsgRollForward for selected-chain headers; (2) the upstream's + -- ChainSel filters CertRBs with missing closures via cdbPendingEBs; + -- (3) LeiosDB does not GC closures. Violations of any of these would + -- cause us to dispatch unsatisfiable fetches. + Nothing -> pickFrom candidateCertEbs + where + pickFrom :: Map (PeerId pid) (Set EbHash) -> Maybe (PeerId pid) + pickFrom source = + foldr (\a _ -> Just a) Nothing $ + [ peerId + | (peerId, ebHashes) <- + Map.toList $ -- TODO prioritize/shuffle? + (`Map.withoutKeys` peerIds) $ -- not already requested from this peer + source + , Map.findWithDefault 0 peerId (Leios.requestedBytesSizePerPeer acc) + <= Leios.maxRequestedBytesSizePerPeer env + , -- peer can be sent more requests + ebHash `Set.member` ebHashes + ] goTx2 :: LeiosOutstanding pid -> @@ -388,20 +406,30 @@ leiosFetchLogicIteration env offerings = BytesSize -> Maybe (PeerId pid, Map EbHash Int) choosePeerTx peerIds acc txOffsets targetTxBytesSize = - foldr (\a _ -> Just a) Nothing $ - [ (peerId, Map.map fst txOffsets') - | (peerId, (_ebIds, ebIds)) <- - Map.toList $ -- TODO prioritize/shuffle? - (`Map.withoutKeys` peerIds) $ -- not already requested from this peer - offerings - , Map.findWithDefault 0 peerId (Leios.requestedBytesSizePerPeer acc) - <= Leios.maxRequestedBytesSizePerPeer env - , -- peer can be sent more requests - let txOffsets' = txOffsets `Map.restrictKeys` ebIds - , case Map.lookupMax txOffsets' of - Nothing -> False - Just (_ebHash, (_txOffset, txBytesSize)) -> targetTxBytesSize == txBytesSize -- peer has offered at least one EB closure that includes this tx with the same size - ] + case pickFrom (Map.map snd offerings) of + Just hit -> Just hit + -- Fallback: peers whose ChainSync candidate fragment contains a CertRB + -- for this EB must have validated the full closure locally, so they + -- also have the txs. See 'choosePeerEb' for the load-bearing invariant + -- chain this depends on. + Nothing -> pickFrom candidateCertEbs + where + pickFrom :: Map (PeerId pid) (Set EbHash) -> Maybe (PeerId pid, Map EbHash Int) + pickFrom source = + foldr (\a _ -> Just a) Nothing $ + [ (peerId, Map.map fst txOffsets') + | (peerId, ebIds) <- + Map.toList $ -- TODO prioritize/shuffle? + (`Map.withoutKeys` peerIds) $ -- not already requested from this peer + source + , Map.findWithDefault 0 peerId (Leios.requestedBytesSizePerPeer acc) + <= Leios.maxRequestedBytesSizePerPeer env + , -- peer can be sent more requests + let txOffsets' = txOffsets `Map.restrictKeys` ebIds + , case Map.lookupMax txOffsets' of + Nothing -> False + Just (_ebHash, (_txOffset, txBytesSize)) -> targetTxBytesSize == txBytesSize + ] packRequests :: LeiosFetchStaticEnv -> @@ -561,7 +589,11 @@ msgLeiosBlock ktracer tracer (outstandingVar, readyVar) db peerId req eb = do let MkLeiosPoint _ebSlot ebHash = point do let ebBytesSize' = leiosEbBytesSize eb - when (ebBytesSize' /= ebBytesSize) $ do + -- A 0 expected size signals a ChainSel-driven request (see + -- 'pendingEbReconciler'): we don't know the EB body size up front because + -- 'hbMayCertifiedEb' only carries the 'LeiosPoint'. The hash check below + -- is the authoritative integrity check. + when (ebBytesSize /= 0 && ebBytesSize' /= ebBytesSize) $ do error $ "MsgLeiosBlock size mismatch: " <> show (ebBytesSize', ebBytesSize) let ebHash' = hashLeiosEb eb when (ebHash' /= ebHash) $ do diff --git a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoTypes.hs b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoTypes.hs index 312dcd1746..2ef59e4dc1 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoTypes.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoTypes.hs @@ -19,7 +19,8 @@ module LeiosDemoTypes (module LeiosDemoTypes) where import Cardano.Binary (FromCBOR (fromCBOR), ToCBOR, enforceSize, serialize', toCBOR) import qualified Cardano.Crypto.Hash as Hash -import Cardano.Ledger.Binary (DecCBOR, EncCBOR) +import Cardano.Ledger.Binary (DecCBOR (..), EncCBOR (..)) +import qualified Cardano.Ledger.Binary as LedgerBinary import Cardano.Ledger.Core (EraTx, Tx) import Cardano.Prelude (NFData, NonEmpty, toList, toString, (&)) import Cardano.Slotting.Slot (SlotNo (SlotNo)) @@ -43,6 +44,7 @@ import Data.IntMap (IntMap) import qualified Data.IntMap as IntMap import Data.Map (Map) import qualified Data.Map as Map +import Data.Maybe (fromMaybe) import Data.Sequence (Seq) import qualified Data.Sequence as Seq import Data.Set (Set) @@ -128,6 +130,17 @@ decodeLeiosPoint = do enforceSize (fromString "LeiosPoint") 2 MkLeiosPoint <$> decode <*> decodeEbHash +instance EncCBOR LeiosPoint where + encCBOR (MkLeiosPoint ebSlot ebHash) = + LedgerBinary.encodeListLen 2 + <> encCBOR ebSlot + <> encCBOR ebHash + +instance DecCBOR LeiosPoint where + decCBOR = do + LedgerBinary.decodeListLenOf 2 + MkLeiosPoint <$> decCBOR <*> decCBOR + -- | Types used in Praos headers data EbAnnouncement = EbAnnouncement { ebAnnouncementHash :: EbHash @@ -298,6 +311,36 @@ emptyLeiosOutstanding = , blockingPerEb = Map.empty } +-- | Insert pending-CertRB entries into 'missingEbBodies' with size 0, +-- without clobbering any existing (offer-supplied) non-zero size. Size 0 +-- is the sentinel for "we don't know the body size because the entry was +-- driven by ChainSel via 'cdbPendingEBs' rather than by an offer". +applyPendingAdded :: + Foldable f => f LeiosPoint -> LeiosOutstanding pid -> LeiosOutstanding pid +applyPendingAdded added outstanding = + outstanding + { missingEbBodies = + foldr + (\point acc -> Map.alter (Just . fromMaybe 0) point acc) + (missingEbBodies outstanding) + added + } + +-- | Drop pending-CertRB entries from 'missingEbBodies'. Only removes +-- entries we own (size 0); offer-supplied entries (non-zero) are +-- preserved. +applyPendingRemoved :: + Foldable f => f LeiosPoint -> LeiosOutstanding pid -> LeiosOutstanding pid +applyPendingRemoved removed outstanding = + outstanding + { missingEbBodies = + foldr + (\point acc -> + Map.update (\sz -> if sz == 0 then Nothing else Just sz) point acc) + (missingEbBodies outstanding) + removed + } + prettyLeiosOutstanding :: LeiosOutstanding pid -> String prettyLeiosOutstanding x = unlines $ diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs index 7eeb5716f7..23ce20d61c 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs @@ -74,8 +74,10 @@ module Ouroboros.Consensus.Storage.ChainDB.API import Control.Monad (void) import Control.ResourceRegistry +import Data.Map.Strict (Map) import Data.Typeable (Typeable) import GHC.Generics (Generic) +import LeiosDemoTypes (LeiosPoint) import Ouroboros.Consensus.Block import Ouroboros.Consensus.HeaderStateHistory ( HeaderStateHistory (..) @@ -158,6 +160,11 @@ data ChainDB m blk = ChainDB -- https://github.com/IntersectMBO/ouroboros-consensus/blob/main/docs/website/contents/for-developers/HandlingBlocksFromTheFuture.md#handling-blocks-from-the-future , chainSelAsync :: m (ChainSelectionPromise m) -- ^ Trigger reprocessing of blocks postponed by the LoE. + , addReprocessBlock :: LeiosPoint -> HeaderHash blk -> m (ChainSelectionPromise m) + -- ^ Re-trigger chain selection for a single CertRB whose EB closure + -- has just become available locally. Removes the block from + -- 'getPendingCertRBs' (keyed by 'LeiosPoint') and re-runs ChainSel + -- against its header (looked up by 'HeaderHash'). , getCurrentChain :: STM m (AnchoredFragment (Header blk)) -- ^ Get the current chain fragment -- @@ -383,6 +390,10 @@ data ChainDB m blk = ChainDB , getChainSelStarvation :: STM m ChainSelStarvation -- ^ Whether ChainSel is currently starved, or when was last time it -- stopped being starved. + , getPendingCertRBs :: STM m (Map LeiosPoint (HeaderHash blk)) + -- ^ CertRBs filtered from ChainSel candidates because their EB closure + -- is not yet available locally. Keyed by the missing EB's 'LeiosPoint', + -- value is the CertRB's header hash. , getLedgerTablesAtFor :: Point blk -> LedgerTables (ExtLedgerState blk) KeysMK -> diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs index 72668e65d6..fb61b4e09c 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs @@ -229,6 +229,7 @@ openDBInternal leiosDb args launchBgTasks = runWithTempRegistry $ do chainSelFuse <- newFuse "chain selection" chainSelQueue <- newChainSelQueue (Args.cdbsBlocksToAddSize cdbSpecificArgs) varChainSelStarvation <- newTVarIO ChainSelStarvationOngoing + varPendingEBs <- newTVarIO Map.empty let env = CDB @@ -254,12 +255,18 @@ openDBInternal leiosDb args launchBgTasks = runWithTempRegistry $ do , cdbChainSelQueue = chainSelQueue , cdbLoE = Args.cdbsLoE cdbSpecificArgs , cdbChainSelStarvation = varChainSelStarvation + , cdbPendingEBs = varPendingEBs } h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env let chainDB = API.ChainDB { addBlockAsync = getEnv2 h ChainSel.addBlockAsync , chainSelAsync = getEnv h ChainSel.triggerChainSelectionAsync + , addReprocessBlock = + getEnv2 h $ \env' -> + addReprocessBlock + (TraceAddBlockEvent >$< cdbTracer env') + (cdbChainSelQueue env') , getCurrentChain = getEnvSTM h Query.getCurrentChain , getCurrentChainWithTime = getEnvSTM h Query.getCurrentChainWithTime , getTipBlock = getEnv h Query.getTipBlock @@ -273,6 +280,7 @@ openDBInternal leiosDb args launchBgTasks = runWithTempRegistry $ do , newFollower = Follower.newFollower h , getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock , getChainSelStarvation = getEnvSTM h Query.getChainSelStarvation + , getPendingCertRBs = getEnvSTM h Query.getPendingCertRBs , closeDB = closeDB h , isOpen = isOpen h , getCurrentLedger = getEnvSTM h Query.getCurrentLedger diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs index ac369c143f..a94adea406 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs @@ -56,7 +56,8 @@ import Data.Void (Void) import Data.Word import GHC.Generics (Generic) import GHC.Stack (HasCallStack) -import LeiosDemoDb (LeiosDbHandle (..)) +import Control.Concurrent.Class.MonadSTM.Strict (readTChan) +import LeiosDemoDb (LeiosDbConnection (..), LeiosDbHandle (..), LeiosEbNotification (..)) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config import Ouroboros.Consensus.HardFork.Abstract @@ -72,6 +73,7 @@ import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel ) import Ouroboros.Consensus.Storage.ChainDB.Impl.Types import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB +import Ouroboros.Consensus.Storage.LedgerDB (ResolveLeiosBlock) import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB import Ouroboros.Consensus.Util @@ -92,6 +94,7 @@ launchBgTasks :: , BlockSupportsDiffusionPipelining blk , InspectLedger blk , HasHardForkHistory blk + , ResolveLeiosBlock blk ) => LeiosDbHandle m -> ChainDbEnv m blk -> @@ -102,6 +105,9 @@ launchBgTasks leiosDb cdb@CDB{..} replayed = do !addBlockThread <- launch "ChainDB.addBlockRunner" $ addBlockRunner leiosDb cdbChainSelFuse cdb + !ebCompletionThread <- + launch "ChainDB.ebCompletionRunner" $ + ebCompletionRunner leiosDb cdb gcSchedule <- newGcSchedule !gcThread <- launch "ChainDB.gcScheduleRunner" $ @@ -112,7 +118,7 @@ launchBgTasks leiosDb cdb@CDB{..} replayed = do copyAndSnapshotRunner cdb gcSchedule replayed cdbCopyFuse atomically $ writeTVar cdbKillBgThreads $ - sequence_ [addBlockThread, gcThread, copyAndSnapshotThread] + sequence_ [addBlockThread, ebCompletionThread, gcThread, copyAndSnapshotThread] where launch :: String -> m Void -> m (m ()) launch = fmap cancelThread .: forkLinkedThread cdbRegistry @@ -520,6 +526,7 @@ addBlockRunner :: , InspectLedger blk , HasHardForkHistory blk , HasCallStack + , ResolveLeiosBlock blk ) => LeiosDbHandle m -> Fuse m -> @@ -539,6 +546,8 @@ addBlockRunner leiosDb fuse cdb@CDB{..} = do case message of ChainSelReprocessLoEBlocks varProcessed -> void $ tryPutTMVar varProcessed () + ChainSelReprocessBlock _ _ varProcessed -> + void $ tryPutTMVar varProcessed () ChainSelAddBlock BlockToAdd{varBlockWrittenToDisk, varBlockProcessed} -> do _ <- tryPutTMVar @@ -555,6 +564,8 @@ addBlockRunner leiosDb fuse cdb@CDB{..} = do lift $ case message of ChainSelReprocessLoEBlocks _ -> trace PoppedReprocessLoEBlocksFromQueue + ChainSelReprocessBlock _ hash _ -> + trace $ PoppedReprocessBlockFromQueue hash ChainSelAddBlock BlockToAdd{blockToAdd} -> trace $ PoppedBlockFromQueue $ @@ -565,3 +576,37 @@ addBlockRunner leiosDb fuse cdb@CDB{..} = do ) where starvationTracer = Tracer $ traceWith cdbTracer . TraceChainSelStarvationEvent + +-- | Monitor LeiosDB for EB completions and re-trigger ChainSel for +-- CertRBs whose EB closure was previously missing. +-- +-- When a CertRB arrives with a missing EB closure, 'chainSelSync' +-- records it in 'cdbPendingEBs' and skips chain selection. This +-- thread watches for the closure to appear in the LeiosDB, then +-- enqueues a 'ChainSelReprocessBlock' so the CertRB can be selected. +ebCompletionRunner :: + IOLike m => + LeiosDbHandle m -> + ChainDbEnv m blk -> + m Void +ebCompletionRunner leiosDb cdb = do + notifChan <- subscribeEbNotifications leiosDb + bracket (open leiosDb) close $ \leiosConn -> forever $ do + notif <- atomically $ readTChan notifChan + let leiosPoint = case notif of + AcquiredEb point _ -> point + AcquiredEbTxs point -> point + pending <- atomically $ readTVar (cdbPendingEBs cdb) + case Map.lookup leiosPoint pending of + Nothing -> pure () + Just certRBHash -> do + mayComplete <- leiosDbQueryCompletedEbByPoint leiosConn leiosPoint + case mayComplete of + Nothing -> pure () + Just _ -> + void $ + addReprocessBlock + (contramap TraceAddBlockEvent (cdbTracer cdb)) + (cdbChainSelQueue cdb) + leiosPoint + certRBHash diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs index 7b875fa320..dfdece60d6 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs @@ -24,7 +24,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel import Cardano.Ledger.BaseTypes (unNonZero) import Control.Exception (assert) -import Control.Monad (forM, forM_, when) +import Control.Monad (forM, forM_, unless, when) import Control.Monad.Except () import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.State.Strict @@ -43,7 +43,7 @@ import Data.Maybe.Strict (StrictMaybe (..), strictMaybeToMaybe) import Data.Set (Set) import qualified Data.Set as Set import GHC.Stack (HasCallStack) -import LeiosDemoDb (LeiosDbConnection) +import LeiosDemoDb (LeiosDbConnection (leiosDbQueryCompletedEbByPoint)) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config import Ouroboros.Consensus.Fragment.Diff (ChainDiff (..)) @@ -348,6 +348,7 @@ chainSelSync :: , InspectLedger blk , HasHardForkHistory blk , HasCallStack + , ResolveLeiosBlock blk ) => LeiosDbConnection m -> ChainDbEnv m blk -> @@ -388,6 +389,14 @@ chainSelSync leiosDb cdb@CDB{..} (ChainSelReprocessLoEBlocks varProcessed) = do for_ loeHeaders $ \hdr -> chainSelectionForBlock leiosDb cdb BlockCache.empty hdr noPunishment lift $ atomically $ putTMVar varProcessed () +-- Re-trigger chain selection for a CertRB whose EB closure has arrived. +-- Remove it from the pending set so 'chainSelectionForBlock' no longer +-- filters it out, then run chain selection for that block. +chainSelSync leiosDb cdb (ChainSelReprocessBlock point hash varProcessed) = do + lift $ atomically $ modifyTVar (cdbPendingEBs cdb) $ Map.delete point + hdr <- lift $ VolatileDB.getKnownBlockComponent (cdbVolatileDB cdb) GetHeader hash + chainSelectionForBlock leiosDb cdb BlockCache.empty hdr noPunishment + lift $ atomically $ putTMVar varProcessed () chainSelSync leiosDb cdb@CDB{..} (ChainSelAddBlock BlockToAdd{blockToAdd = b, ..}) = do (isMember, invalid, curChain) <- lift $ @@ -426,7 +435,47 @@ chainSelSync leiosDb cdb@CDB{..} (ChainSelAddBlock BlockToAdd{blockToAdd = b, .. encloseWith (traceEv >$< addBlockTracer) $ VolatileDB.putBlock cdbVolatileDB b lift $ deliverWrittenToDisk True - chainSelectionForBlock leiosDb cdb (BlockCache.singleton b) hdr blockPunish + + -- If this block certifies an EB whose closure we don't have yet, + -- record it as pending and skip chain selection โ€” it would be + -- filtered out anyway. ChainSel will be re-triggered by + -- 'ebCompletionRunner' (Background.hs) when the EB closure + -- arrives. + -- + -- After inserting into 'cdbPendingEBs', re-query the LeiosDb: + -- the closure may have completed between the first query and + -- the insert, in which case 'ebCompletionRunner' fired against + -- an empty pending set and dropped its notification. If we hit + -- that window, remove the entry and fall through to chain + -- selection in-process โ€” the in-place removal preserves the + -- "every removal pairs with a ChainSel run" property. + -- + -- A broader sweep in 'leiosFetchLogic' (NodeKernel.hs) covers + -- this race and other missed-notification scenarios on its + -- normal iteration cadence; this inline recheck is a local + -- optimization that closes the immediate window without + -- waiting for the next sweep tick. + isPending <- lift $ case certifiedEbFromHeader hdr of + Nothing -> pure False + Just leiosPoint -> do + mayClosure <- leiosDbQueryCompletedEbByPoint leiosDb leiosPoint + case mayClosure of + Just _ -> pure False + Nothing -> do + atomically $ + modifyTVar cdbPendingEBs $ + Map.insert leiosPoint (headerHash hdr) + mayClosure' <- leiosDbQueryCompletedEbByPoint leiosDb leiosPoint + case mayClosure' of + Just _ -> do + atomically $ + modifyTVar cdbPendingEBs $ + Map.delete leiosPoint + pure False + Nothing -> pure True + + unless isPending $ + chainSelectionForBlock leiosDb cdb (BlockCache.singleton b) hdr blockPunish newTip <- lift $ atomically $ Query.getTipPoint cdb @@ -539,14 +588,15 @@ chainSelectionForBlock :: InvalidBlockPunishment m -> Electric m () chainSelectionForBlock leiosDb cdb@CDB{..} blockCache hdr punish = electric $ withRegistry $ \rr -> do - (invalid, succsOf, lookupBlockInfo, curChain, tipPoint) <- + (invalid, succsOf, lookupBlockInfo, curChain, tipPoint, pendingHashes) <- atomically $ - (,,,,) + (,,,,,) <$> (forgetFingerprint <$> readTVar cdbInvalid) <*> VolatileDB.filterByPredecessor cdbVolatileDB <*> VolatileDB.getBlockInfo cdbVolatileDB <*> Query.getCurrentChain cdb <*> Query.getTipPoint cdb + <*> (Set.fromList . Map.elems <$> readTVar cdbPendingEBs) -- This is safe: the LedgerDB tip doesn't change in between the previous -- atomically block and this call to 'withTipForker'. LedgerDB.withTipForker cdbLedgerDB rr $ \curForker -> do @@ -561,9 +611,13 @@ chainSelectionForBlock leiosDb cdb@CDB{..} blockCache hdr punish = electric $ wi immBlockNo :: WithOrigin BlockNo immBlockNo = AF.anchorBlockNo curChain - -- Let these two functions ignore invalid blocks - lookupBlockInfo' = ignoreInvalid cdb invalid lookupBlockInfo - succsOf' = ignoreInvalidSuc cdb invalid succsOf + -- Let these two functions ignore invalid blocks and pending CertRBs + lookupBlockInfo' = + ignorePending pendingHashes $ + ignoreInvalid cdb invalid lookupBlockInfo + succsOf' = + Set.filter (`Set.notMember` pendingHashes) + . ignoreInvalidSuc cdb invalid succsOf -- The preconditions assert (isJust $ lookupBlockInfo (headerHash hdr)) $ return () @@ -1416,3 +1470,14 @@ ignoreInvalidSuc :: (ChainHash blk -> Set (HeaderHash blk)) ignoreInvalidSuc _ invalid succsOf = Set.filter (`Map.notMember` invalid) . succsOf + +-- | Wrap a lookup function so that pending CertRBs (whose EB closure +-- hasn't arrived yet) are invisible to chain selection. +ignorePending :: + Ord h => + Set h -> + (h -> Maybe a) -> + (h -> Maybe a) +ignorePending pending getter hash + | Set.member hash pending = Nothing + | otherwise = getter hash diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs index fcc148b6b1..26f46e6100 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs @@ -30,12 +30,15 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Query , getAnyKnownBlock , getAnyKnownBlockComponent , getChainSelStarvation + , getPendingCertRBs ) where import Cardano.Ledger.BaseTypes (unNonZero) import Control.ResourceRegistry (ResourceRegistry) +import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import qualified Data.Set as Set +import LeiosDemoTypes (LeiosPoint) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config import Ouroboros.Consensus.HeaderStateHistory @@ -194,6 +197,13 @@ getChainSelStarvation :: STM m ChainSelStarvation getChainSelStarvation CDB{..} = readTVar cdbChainSelStarvation +getPendingCertRBs :: + forall m blk. + IOLike m => + ChainDbEnv m blk -> + STM m (Map LeiosPoint (HeaderHash blk)) +getPendingCertRBs cdb = readTVar (cdbPendingEBs cdb) + getIsValid :: forall m blk. (IOLike m, HasHeader blk) => diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs index 37ef98fe9d..404621366d 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs @@ -55,6 +55,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types , ChainSelMessage (..) , ChainSelQueue -- opaque , addBlockToAdd + , addReprocessBlock , addReprocessLoEBlocks , closeChainSelQueue , getChainSelMessage @@ -92,6 +93,7 @@ import Data.Typeable import Data.Void (Void) import Data.Word (Word64) import GHC.Generics (Generic) +import LeiosDemoTypes (LeiosPoint) import NoThunks.Class (OnlyCheckWhnfNamed (..)) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config @@ -350,6 +352,11 @@ data ChainDbEnv m blk = CDB , cdbChainSelStarvation :: !(StrictTVar m ChainSelStarvation) -- ^ Information on the last starvation of ChainSel, whether ongoing or -- ended recently. + , cdbPendingEBs :: !(StrictTVar m (Map LeiosPoint (HeaderHash blk))) + -- ^ CertRBs whose EB closure hasn't been fetched yet. Keyed by the + -- missing EB's 'LeiosPoint', value is the CertRB's header hash. + -- ChainSel filters these out of successor candidates until the EB + -- closure arrives. } deriving Generic @@ -547,6 +554,19 @@ data ChainSelMessage m blk ChainSelReprocessLoEBlocks -- | Used for 'ChainSelectionPromise'. !(StrictTMVar m ()) + | -- | Reprocess a single block whose EB closure has arrived. + -- Used by the late-join mechanism: when a CertRB's missing EB + -- closure becomes available, this message re-triggers ChainSel + -- for that specific block. + -- + -- The 'LeiosPoint' is the missing EB's point, used as the key into + -- 'cdbPendingEBs'; the 'HeaderHash' is the CertRB itself, used to + -- look the header up in the VolatileDB. + ChainSelReprocessBlock + !LeiosPoint + !(HeaderHash blk) + -- Used for 'ChainSelectionPromise'. + !(StrictTMVar m ()) -- | Create a new 'ChainSelQueue' with the given size. newChainSelQueue :: (IOLike m, StandardHash blk, Typeable blk) => Word -> m (ChainSelQueue m blk) @@ -606,6 +626,24 @@ addReprocessLoEBlocks tracer ChainSelQueue{varChainSelQueue} = do ChainSelReprocessLoEBlocks varProcessed return $ ChainSelectionPromise waitUntilRan +-- | Re-trigger chain selection for a single block whose EB closure has +-- arrived. Modelled on 'addReprocessLoEBlocks'. +addReprocessBlock :: + IOLike m => + Tracer m (TraceAddBlockEvent blk) -> + ChainSelQueue m blk -> + LeiosPoint -> + HeaderHash blk -> + m (ChainSelectionPromise m) +addReprocessBlock tracer ChainSelQueue{varChainSelQueue} point hash = do + varProcessed <- newEmptyTMVarIO + let waitUntilRan = atomically $ readTMVar varProcessed + traceWith tracer $ AddedReprocessBlockToQueue hash + atomically $ + writeTBQueue varChainSelQueue $ + ChainSelReprocessBlock point hash varProcessed + return $ ChainSelectionPromise waitUntilRan + -- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the -- queue is empty; in that case, reports the starvation (and its end) via the -- given tracer. @@ -643,6 +681,7 @@ getChainSelMessage starvationTracer starvationVar chainSelQueue = traceWith starvationTracer $ ChainSelStarvation (FallingEdgeWith pt) atomically . writeTVar starvationVar . ChainSelStarvationEndedAt =<< getMonotonicTime ChainSelReprocessLoEBlocks{} -> pure () + ChainSelReprocessBlock{} -> pure () -- TODO Can't use tryReadTBQueue from io-classes because it is broken for IOSim -- (but not for IO). https://github.com/input-output-hk/io-sim/issues/195 @@ -664,6 +703,7 @@ closeChainSelQueue ChainSelQueue{varChainSelQueue = queue} = do blockAdd = \case ChainSelAddBlock ab -> Just ab ChainSelReprocessLoEBlocks _ -> Nothing + ChainSelReprocessBlock _ _ _ -> Nothing -- | To invoke when the given 'ChainSelMessage' has been processed by ChainSel. -- This is used to remove the respective point from the multiset of points in @@ -678,6 +718,8 @@ processedChainSelMessage ChainSelQueue{varChainSelPoints} = \case modifyTVar varChainSelPoints $ MultiSet.delete (blockRealPoint blk) ChainSelReprocessLoEBlocks{} -> pure () + ChainSelReprocessBlock{} -> + pure () -- | Return a function to test the membership memberChainSelQueue :: @@ -829,6 +871,11 @@ data TraceAddBlockEvent blk AddedReprocessLoEBlocksToQueue | -- | ChainSel will reprocess blocks that were postponed by the LoE. PoppedReprocessLoEBlocksFromQueue + | -- | A reprocess-block message was added to the queue, requesting ChainSel + -- to re-run for a CertRB whose EB closure has arrived. + AddedReprocessBlockToQueue !(HeaderHash blk) + | -- | ChainSel will reprocess the given CertRB. + PoppedReprocessBlockFromQueue !(HeaderHash blk) | -- | A block was added to the Volatile DB AddedBlockToVolatileDB (RealPoint blk) BlockNo IsEBB Enclosing | -- | The block fits onto the current chain, we'll try to use it to extend diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs index b7cbecc549..c07a2062bb 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs @@ -74,6 +74,7 @@ import qualified Data.Set as Set import Data.Word import GHC.Generics import LeiosDemoDb (LeiosDbConnection) +import LeiosDemoTypes (LeiosPoint) import NoThunks.Class import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config @@ -441,6 +442,12 @@ class ResolveLeiosBlock blk where Monad m => LeiosDbConnection m -> HeaderState blk -> blk -> m blk resolveLeiosBlock _ _ blk = return blk + -- | Extract the certified EB point from a header, if this block + -- certifies an EB. Used by ChainSel to defer blocks whose EB + -- closure hasn't been fetched yet. + certifiedEbFromHeader :: Header blk -> Maybe LeiosPoint + certifiedEbFromHeader _ = Nothing + -- | Apply blocks to the given forker applyBlock :: forall m bm c l blk. diff --git a/ouroboros-consensus/test/consensus-test/Test/LeiosDemoTypes.hs b/ouroboros-consensus/test/consensus-test/Test/LeiosDemoTypes.hs index aa36873fa4..7d14e06f06 100644 --- a/ouroboros-consensus/test/consensus-test/Test/LeiosDemoTypes.hs +++ b/ouroboros-consensus/test/consensus-test/Test/LeiosDemoTypes.hs @@ -1,12 +1,20 @@ module Test.LeiosDemoTypes (tests) where import Cardano.Binary (serialize') +import Cardano.Slotting.Slot (SlotNo (SlotNo)) import qualified Data.ByteString as BS +import qualified Data.Map.Strict as Map import qualified Data.Vector as V import LeiosDemoTypes ( BytesSize + , EbHash (..) , LeiosEb (..) + , LeiosOutstanding (..) + , LeiosPoint (..) , TxHash (..) + , applyPendingAdded + , applyPendingRemoved + , emptyLeiosOutstanding , encodeLeiosEb , leiosEbBytesSize , maxTxsPerEb @@ -15,6 +23,7 @@ import Test.QuickCheck ( Gen , Property , chooseInt + , conjoin , counterexample , forAll , frequency @@ -29,8 +38,40 @@ tests = testGroup "LeiosDemoTypes" [ testProperty "leiosEbBytesSize consistent with encodeLeiosEb" prop_ebBytesSizeConsistent + , testProperty "reconciler: pending-only insert/remove round-trip" prop_reconciler_pending_only + , testProperty "reconciler: offer-supplied entry survives pending add/remove" prop_reconciler_offer_wins ] +-- | A pending CertRB whose body size is unknown gets inserted at size 0 +-- and is then dropped cleanly when it leaves the pending set. +prop_reconciler_pending_only :: Property +prop_reconciler_pending_only = + let point = MkLeiosPoint (SlotNo 42) (MkEbHash (BS.replicate 32 0)) + afterAdd = applyPendingAdded [point] (emptyLeiosOutstanding :: LeiosOutstanding ()) + afterRemove = applyPendingRemoved [point] afterAdd + in conjoin + [ Map.lookup point (missingEbBodies afterAdd) === Just 0 + , Map.lookup point (missingEbBodies afterRemove) === Nothing + ] + +-- | An offer that arrived first with a real size must not be clobbered by +-- a pending add and must survive a pending remove. This is the invariant +-- that keeps the fetch byte budget accurate. +prop_reconciler_offer_wins :: Property +prop_reconciler_offer_wins = + let point = MkLeiosPoint (SlotNo 17) (MkEbHash (BS.replicate 32 1)) + realSize = 1234 + start = + (emptyLeiosOutstanding :: LeiosOutstanding ()) + { missingEbBodies = Map.singleton point realSize + } + afterAdd = applyPendingAdded [point] start + afterRemove = applyPendingRemoved [point] afterAdd + in conjoin + [ Map.lookup point (missingEbBodies afterAdd) === Just realSize + , Map.lookup point (missingEbBodies afterRemove) === Just realSize + ] + -- | Minimum tx size as per the ASSUMPTION in 'leiosEbBytesSize'. minTxBytesSize :: Int minTxBytesSize = 55