diff --git a/ouroboros-consensus-cardano/src/byron/Ouroboros/Consensus/Byron/Ledger/Block.hs b/ouroboros-consensus-cardano/src/byron/Ouroboros/Consensus/Byron/Ledger/Block.hs index eb1ec966d1..55bb5b08e7 100644 --- a/ouroboros-consensus-cardano/src/byron/Ouroboros/Consensus/Byron/Ledger/Block.hs +++ b/ouroboros-consensus-cardano/src/byron/Ouroboros/Consensus/Byron/Ledger/Block.hs @@ -56,7 +56,7 @@ import Ouroboros.Consensus.Block import qualified Ouroboros.Consensus.Byron.EBBs as EBBs import Ouroboros.Consensus.Byron.Ledger.Conversions import Ouroboros.Consensus.Byron.Ledger.Orphans () -import Ouroboros.Consensus.Storage.LedgerDB (ResolveLeiosBlock) +import Ouroboros.Consensus.Storage.LedgerDB (IsCertRB (..), ResolveLeiosBlock (..)) import Ouroboros.Consensus.Util (ShowProxy (..)) import Ouroboros.Consensus.Util.Condense import Ouroboros.Network.SizeInBytes (SizeInBytes) @@ -105,7 +105,9 @@ instance Condense ByronBlock where instance ShowProxy ByronBlock -- | Default 'ResolveLeiosBlock' — Byron blocks never carry Leios certs. -instance ResolveLeiosBlock ByronBlock +instance ResolveLeiosBlock ByronBlock where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing instance NFData ByronBlock where rnf ByronBlock{byronBlockRaw, byronBlockSlotNo, byronBlockHash} = diff --git a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Block.hs b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Block.hs index b1b58f7644..0a85acdae0 100644 --- a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Block.hs +++ b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Block.hs @@ -238,7 +238,7 @@ import Ouroboros.Consensus.Protocol.TPraos (TPraos) import Ouroboros.Consensus.Shelley.Eras import Ouroboros.Consensus.Shelley.Ledger (ShelleyBlock) import Ouroboros.Consensus.Shelley.Ledger.Block (ShelleyCompatible) -import Ouroboros.Consensus.Storage.LedgerDB (ResolveLeiosBlock (..)) +import Ouroboros.Consensus.Storage.LedgerDB (IsCertRB (..), ResolveLeiosBlock (..)) import Ouroboros.Consensus.TypeFamilyWrappers {------------------------------------------------------------------------------- @@ -1544,3 +1544,11 @@ instance headerLeiosAnnouncement hdr = case hdr of HeaderDijkstra dHdr -> headerLeiosAnnouncement dHdr _ -> Nothing + + headerIsCertRB hdr = case hdr of + HeaderDijkstra dHdr -> headerIsCertRB dHdr + _ -> NotCertRB + + headerEbAnnouncement hdr = case hdr of + HeaderDijkstra dHdr -> headerEbAnnouncement dHdr + _ -> Nothing diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/HFEras.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/HFEras.hs index 4806dd7323..628cdaa99d 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/HFEras.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/HFEras.hs @@ -40,7 +40,7 @@ import Ouroboros.Consensus.Shelley.Ledger.Protocol () import Ouroboros.Consensus.Shelley.Protocol.Praos () import Ouroboros.Consensus.Shelley.Protocol.TPraos () import Ouroboros.Consensus.Shelley.ShelleyHFC () -import Ouroboros.Consensus.Storage.LedgerDB (ResolveLeiosBlock) +import Ouroboros.Consensus.Storage.LedgerDB (IsCertRB (..), ResolveLeiosBlock (..)) {------------------------------------------------------------------------------- Hard fork eras @@ -115,9 +115,21 @@ instance Praos.PraosCrypto c => ShelleyCompatible (Praos c) DijkstraEra lives in "Ouroboros.Consensus.Shelley.Ledger.Ledger". -------------------------------------------------------------------------------} -instance ResolveLeiosBlock (ShelleyBlock (TPraos c) ShelleyEra) -instance ResolveLeiosBlock (ShelleyBlock (TPraos c) AllegraEra) -instance ResolveLeiosBlock (ShelleyBlock (TPraos c) MaryEra) -instance ResolveLeiosBlock (ShelleyBlock (TPraos c) AlonzoEra) -instance ResolveLeiosBlock (ShelleyBlock (Praos c) BabbageEra) -instance ResolveLeiosBlock (ShelleyBlock (Praos c) ConwayEra) +instance ResolveLeiosBlock (ShelleyBlock (TPraos c) ShelleyEra) where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing +instance ResolveLeiosBlock (ShelleyBlock (TPraos c) AllegraEra) where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing +instance ResolveLeiosBlock (ShelleyBlock (TPraos c) MaryEra) where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing +instance ResolveLeiosBlock (ShelleyBlock (TPraos c) AlonzoEra) where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing +instance ResolveLeiosBlock (ShelleyBlock (Praos c) BabbageEra) where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing +instance ResolveLeiosBlock (ShelleyBlock (Praos c) ConwayEra) where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Forge.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Forge.hs index 2978329324..604ebdb30f 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Forge.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Forge.hs @@ -36,6 +36,7 @@ import LeiosDemoDb import LeiosDemoTypes ( EbAnnouncement (..) , ForgedLeiosEb (..) + , IsCertRB (..) , LeiosPoint (..) , TraceLeiosKernel (..) , forgeLeiosEb @@ -97,6 +98,7 @@ forgeShelleyBlock hotKey cbl ForgeBlockArgs{..} = do actualBodySize protocolVersion mayEbAnn + (case mayLeiosCert of SJust _ -> CertRB; SNothing -> NotCertRB) let blk = mkShelleyBlock $ SL.Block hdr body return $ assert (verifyBlockIntegrity (configSlotsPerKESPeriod $ configConsensus fbConfig) blk) $ @@ -203,7 +205,7 @@ forgeShelleyBlock hotKey cbl ForgeBlockArgs{..} = do , mempoolRestMeasure = ByteSize32 0 } leiosDbInsertEbPoint fbLeiosDb (forgedEb.point) ebSize - leiosDbInsertEbBody fbLeiosDb (forgedEb.point) (forgedEb.body) + void $ leiosDbInsertEbBody fbLeiosDb (forgedEb.point) (forgedEb.body) void $ leiosDbInsertTxs fbLeiosDb (forgedEb.txClosure) traceWith fbLeiosTracer $ TraceLeiosBlockStored diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Ledger.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Ledger.hs index 0f0ba90e3f..cf8f60697e 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Ledger.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Ledger.hs @@ -142,7 +142,7 @@ import Ouroboros.Consensus.Protocol.Ledger.Util (isNewEpoch) import Ouroboros.Consensus.Protocol.Praos (Praos, PraosState (..)) import Ouroboros.Consensus.Protocol.Praos.Header ( Header (Header, headerBody) - , HeaderBody (hbLeiosEbAnnouncement, hbSlotNo) + , HeaderBody (hbIsCertRB, hbLeiosEbAnnouncement, hbSlotNo) ) import Ouroboros.Consensus.Shelley.Ledger.Block import Ouroboros.Consensus.Shelley.Ledger.Config @@ -1045,6 +1045,12 @@ instance annBody :: HeaderBody c Header{headerBody = annBody} = shelleyHeaderRaw hdr + headerIsCertRB hdr = + hbIsCertRB (headerBody (shelleyHeaderRaw hdr)) + + headerEbAnnouncement hdr = + strictMaybeToMaybe $ hbLeiosEbAnnouncement (headerBody (shelleyHeaderRaw hdr)) + -- | Deserialise a transaction supplied as Leios-stored bytes. deserialiseLeiosTx :: forall era. ShelleyBasedEra era => BS.ByteString -> Tx TopTx era deserialiseLeiosTx bs = diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Abstract.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Abstract.hs index 58bc3f32a3..8c4a240cd9 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Abstract.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Abstract.hs @@ -50,7 +50,7 @@ import Data.Kind (Type) import Data.Typeable (Typeable) import Data.Word (Word64) import GHC.Generics (Generic) -import LeiosDemoTypes (EbAnnouncement) +import LeiosDemoTypes (EbAnnouncement, IsCertRB) import NoThunks.Class (NoThunks) import Numeric.Natural (Natural) import Ouroboros.Consensus.Protocol.Abstract @@ -169,6 +169,8 @@ class ProtocolHeaderSupportsKES proto where ProtVer -> -- | Optional Leios EB announcement. Only used by Praos. Maybe EbAnnouncement -> + -- | Whether this RB certifies a previously-announced EB + IsCertRB -> m (ShelleyProtocolHeader proto) -- | Extract the most recently announced (and not yet certified) Leios EB diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Praos.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Praos.hs index fb4a091e18..b9bb027dcd 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Praos.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/Praos.hs @@ -145,7 +145,7 @@ instance PraosCrypto c => ProtocolHeaderSupportsKES (Praos c) where currentKesPeriod - startOfKesPeriod | otherwise = 0 - mkHeader hk cbl il slotNo blockNo prevHash bbHash sz protVer mayEbAnn = do + mkHeader hk cbl il slotNo blockNo prevHash bbHash sz protVer mayEbAnn isCertRB = do PraosFields{praosSignature, praosToSign} <- forgePraosFields hk cbl il mkBhBodyBytes pure $ Header praosToSign praosSignature where @@ -168,6 +168,7 @@ instance PraosCrypto c => ProtocolHeaderSupportsKES (Praos c) where , hbOCert = praosToSignOCert , hbProtVer = protVer , hbLeiosEbAnnouncement = maybe SNothing SJust mayEbAnn + , hbIsCertRB = isCertRB } protocolStateLeiosInfo _ cs = @@ -187,6 +188,7 @@ instance PraosCrypto c => ProtocolHeaderSupportsProtocol (Praos c) where , hvSlotNo = hbSlotNo headerBody , hvSigned = headerBody , hvSignature = headerSig + , hvIsCertRB = hbIsCertRB headerBody } pHeaderIssuer = hbVk . headerBody pHeaderIssueNo = SL.ocertN . hbOCert . headerBody diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/TPraos.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/TPraos.hs index b8a616b954..57609802be 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/TPraos.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Protocol/TPraos.hs @@ -90,7 +90,7 @@ instance PraosCrypto c => ProtocolHeaderSupportsKES (TPraos c) where currentKesPeriod - startOfKesPeriod | otherwise = 0 - mkHeader hotKey canBeLeader isLeader curSlot curNo prevHash bbHash actualBodySize protVer _mayEbAnn = do + mkHeader hotKey canBeLeader isLeader curSlot curNo prevHash bbHash actualBodySize protVer _mayEbAnn _isCertRB = do TPraosFields{tpraosSignature, tpraosToSign} <- forgeTPraosFields hotKey canBeLeader isLeader mkBhBody pure $ SL.BHeader tpraosToSign tpraosSignature diff --git a/ouroboros-consensus-cardano/src/unstable-cardano-testlib/Test/ThreadNet/Infra/ShelleyBasedHardFork.hs b/ouroboros-consensus-cardano/src/unstable-cardano-testlib/Test/ThreadNet/Infra/ShelleyBasedHardFork.hs index 3c9fb4b684..274e153a14 100644 --- a/ouroboros-consensus-cardano/src/unstable-cardano-testlib/Test/ThreadNet/Infra/ShelleyBasedHardFork.hs +++ b/ouroboros-consensus-cardano/src/unstable-cardano-testlib/Test/ThreadNet/Infra/ShelleyBasedHardFork.hs @@ -120,7 +120,9 @@ type ShelleyBasedHardForkBlock proto1 era1 proto2 era2 = -- Real-era resolution (the Dijkstra splice) lives in -- "Ouroboros.Consensus.Cardano.Block"; these test stacks never enter -- Dijkstra. -instance ResolveLeiosBlock (ShelleyBasedHardForkBlock proto1 era1 proto2 era2) +instance ResolveLeiosBlock (ShelleyBasedHardForkBlock proto1 era1 proto2 era2) where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing {------------------------------------------------------------------------------- Pattern synonyms, for encapsulation and legibility diff --git a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBImmutaliser/Run.hs b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBImmutaliser/Run.hs index 1739fe7ad0..5211175817 100644 --- a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBImmutaliser/Run.hs +++ b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBImmutaliser/Run.hs @@ -52,6 +52,7 @@ import Ouroboros.Consensus.Storage.ImmutableDB , tipToPoint ) import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB +import Ouroboros.Consensus.Storage.LedgerDB (ResolveLeiosBlock) import Ouroboros.Consensus.Storage.VolatileDB ( VolatileDB , VolatileDbArgs (..) @@ -108,6 +109,7 @@ withDBs :: , ImmutableDB.ImmutableDbSerialiseConstraints blk , VolatileDB.VolatileDbSerialiseConstraints blk , NodeInitStorage blk + , ResolveLeiosBlock blk ) => TopLevelConfig blk -> ResourceRegistry m -> diff --git a/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Examples.hs b/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Examples.hs index d444cfaf5e..c1eed128a6 100644 --- a/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Examples.hs +++ b/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Examples.hs @@ -37,6 +37,7 @@ import Data.List.NonEmpty (NonEmpty ((:|))) import qualified Data.Map as Map import Data.Maybe.Strict (StrictMaybe (..)) import qualified Data.Set as Set +import LeiosDemoTypes (IsCertRB (..)) import Lens.Micro import Ouroboros.Consensus.Block import Ouroboros.Consensus.HeaderValidation @@ -287,6 +288,7 @@ fromShelleyLedgerExamplesPraos , hbOCert = SL.bheaderOCert bhBody , hbProtVer = SL.bprotver bhBody , hbLeiosEbAnnouncement = SNothing + , hbIsCertRB = NotCertRB } hSig = coerce bhSig hash = ShelleyHash $ SL.unHashHeader pleHashHeader diff --git a/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Generators.hs b/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Generators.hs index 4f128b4773..66a00bf413 100644 --- a/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Generators.hs +++ b/ouroboros-consensus-cardano/src/unstable-shelley-testlib/Test/Consensus/Shelley/Generators.hs @@ -21,6 +21,7 @@ import Cardano.Slotting.EpochInfo import Control.Monad (replicateM) import Data.Coerce (coerce) import Data.Maybe.Strict (StrictMaybe (..)) +import LeiosDemoTypes (IsCertRB (..)) import Ouroboros.Consensus.Block import Ouroboros.Consensus.HeaderValidation import Ouroboros.Consensus.Ledger.Abstract @@ -139,6 +140,7 @@ instance , Praos.hbOCert = SL.bheaderOCert bhBody , Praos.hbProtVer = SL.bprotver bhBody , Praos.hbLeiosEbAnnouncement = SNothing + , Praos.hbIsCertRB = NotCertRB } hSig = coerce bhSig diff --git a/ouroboros-consensus-cardano/test/cardano-test/Test/ThreadNet/Leios.hs b/ouroboros-consensus-cardano/test/cardano-test/Test/ThreadNet/Leios.hs index a94df3e49f..2722c5067d 100644 --- a/ouroboros-consensus-cardano/test/cardano-test/Test/ThreadNet/Leios.hs +++ b/ouroboros-consensus-cardano/test/cardano-test/Test/ThreadNet/Leios.hs @@ -107,8 +107,10 @@ import Test.Consensus.Cardano.ProtocolInfo (Era (Dijkstra), hardForkInto) import Test.QuickCheck ( Property , Testable + , choose , conjoin , counterexample + , forAll , tabulate , (.&&.) , (.||.) @@ -144,7 +146,7 @@ import Test.ThreadNet.Network , TraceThreadNetNode (FromLeios, FromLeiosPeer, FromMempool) ) import Test.ThreadNet.TxGen.Cardano (CardanoTxGenExtra (..)) -import Test.ThreadNet.Util.NodeJoinPlan (trivialNodeJoinPlan) +import Test.ThreadNet.Util.NodeJoinPlan (NodeJoinPlan (..), trivialNodeJoinPlan) import Test.ThreadNet.Util.NodeRestarts (noRestarts) import Test.ThreadNet.Util.NodeToNodeVersion (newestVersion) import Test.ThreadNet.Util.NodeTopology (meshNodeTopology) @@ -159,6 +161,8 @@ tests = "Leios ThreadNet" [ adjustQuickCheckTests (`div` 10) $ testProperty "basic functionality" prop_leios + , adjustQuickCheckTests (`div` 10) $ + testProperty "late join" prop_leios_late_join ] -- | Verify a suite of basic Leios ThreadNet invariants in a single run: @@ -190,7 +194,9 @@ prop_leios seed = numSlots = 200 :: Word64 (testOutput, ProtocolInfo{pInfoConfig, pInfoInitLedger}) = - runThreadNet seed (NumSlots numSlots) (NumCoreNodes $ fromIntegral numNodes) + runThreadNet seed (NumSlots numSlots) numCoreNodes (trivialNodeJoinPlan numCoreNodes) + + numCoreNodes = NumCoreNodes $ fromIntegral numNodes traces = testOutput.allTraces @@ -343,6 +349,54 @@ prop_leios seed = | (s1, s2) <- zip certifyingBlocks (drop 1 certifyingBlocks) ] +-- | A late-joining node must catch up and converge with its peers. +-- +-- 4 nodes, 200 slots. Nodes 0–2 join at slot 0; node 3 joins at a random +-- slot, after some CertRBs may already have been produced. With the +-- late-join machinery in place the node must both (a) not crash in +-- 'resolveLeiosBlock' on a CertRB whose certified EB closure it never +-- observed live, and (b) fetch the missing closures and end up on the +-- same chain as everyone else. +prop_leios_late_join :: Seed -> Property +prop_leios_late_join seed = + -- Cap the join slot at numSlots/4 so the late node always has at least + -- 3/4 of the run to catch up. Samples near numSlots would otherwise fail + -- the chain-consistency assertion for catch-up-bandwidth reasons + -- unrelated to the late-join logic under test. + forAll (choose (1, fromIntegral numSlots `div` 4)) $ \lateJoinSlot -> + let + joinPlan = + NodeJoinPlan $ + Map.fromList + [ (CoreNodeId 0, SlotNo 0) + , (CoreNodeId 1, SlotNo 0) + , (CoreNodeId 2, SlotNo 0) + , (CoreNodeId 3, SlotNo $ fromIntegral (lateJoinSlot :: Int)) + ] + + numCoreNodes = NumCoreNodes 4 + + (testOutput, _) = + runThreadNet seed (NumSlots numSlots) numCoreNodes joinPlan + + nodeChains = + Chain.toOldestFirst . nodeOutputFinalChain <$> testOutput.testOutputNodes + in + conjoin + [ not (null nodeChains) + & counterexample "test output was empty" + , -- ChainSel hides a CertRB whose EB closure is missing instead of + -- crashing; the EB-completion re-trigger ('ebCompletionRunner') + -- plus the late-join fetch then let the node select it, so all + -- nodes converge on the same chain. + all (== head (Map.elems nodeChains)) nodeChains + & counterexample "nodes have different chains" + & counterexample ("chain lengths: " <> show (fmap length nodeChains)) + ] + & counterexample ("late join slot: " <> show lateJoinSlot) + where + numSlots = 200 :: Word64 + -- | Independently compute cumulative tx bytes by resolving each block in the -- chain (filling in EB closures from the LeiosDB) and summing individual -- 'sizeTxF' values per transaction. @@ -427,8 +481,9 @@ runThreadNet :: Seed -> NumSlots -> NumCoreNodes -> + NodeJoinPlan -> (TestOutput (CardanoBlock StandardCrypto), ProtocolInfo (CardanoBlock StandardCrypto)) -runThreadNet initSeed numSlots numCoreNodes = +runThreadNet initSeed numSlots numCoreNodes joinPlan = ( runTestNetwork testConfig testConfigB @@ -514,7 +569,7 @@ runThreadNet initSeed numSlots numCoreNodes = { forgeEbbEnv = Nothing , future = EraFinal slotLength shelleyGenesis.sgEpochLength , messageDelay = noCalcMessageDelay - , nodeJoinPlan = trivialNodeJoinPlan numCoreNodes + , nodeJoinPlan = joinPlan , nodeRestarts = noRestarts , txGenExtra = CardanoTxGenExtra diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index 143c089cc2..51d7e55322 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -52,7 +52,7 @@ import Data.Bifunctor (second) import qualified Data.ByteString as BS import Data.Data (Typeable) import Data.Either (partitionEithers) -import Data.Foldable (traverse_) +import Data.Foldable (Foldable (foldl'), traverse_) import Data.Function (on) import Data.Functor ((<&>)) import Data.Hashable (Hashable) @@ -63,6 +63,7 @@ import Data.Maybe (isJust) import qualified Data.Measure import Data.Proxy import Data.Set (Set) +import qualified Data.Set as Set import qualified Data.Text as Text import Data.Void (Void) import LeiosDemoDb @@ -73,7 +74,9 @@ import LeiosDemoDb import qualified LeiosDemoDb as LeiosDb import qualified LeiosDemoLogic as Leios import LeiosDemoTypes - ( LeiosOutstanding + ( EbAnnouncement (ebAnnouncementHash) + , EbHash + , LeiosOutstanding , LeiosPeerVars , LeiosVote (..) , TraceLeiosKernel (..) @@ -291,6 +294,7 @@ initNodeKernel :: ( IOLike m , SI.MonadTimer m , RunNode blk + , ResolveLeiosBlock blk , Ord addrNTN , Hashable addrNTN , Typeable addrNTN @@ -450,6 +454,37 @@ initNodeKernel getLeiosOutstanding <- MVar.newMVar Leios.emptyLeiosOutstanding getLeiosReady <- MVar.newEmptyMVar + -- Mirror ChainSel's pending-EB set into the Leios outstanding + -- 'missingEbBodies' so the LeiosFetch client pulls the closures for + -- CertRBs that ChainSel is holding back. A late-joining node would + -- otherwise never request these EBs, because no peer re-offers an EB + -- announced before the node joined. We use the announced byte size + -- ('getPendingCertRBs' carries it) so the fetch request matches the + -- body the peer returns. + lastAppliedPendingEBs <- newTVarIO Map.empty + void $ + forkLinkedThread registry "NodeKernel.pendingEbReconciler" $ + forever $ do + (added, removed) <- atomically $ do + new <- ChainDB.getPendingCertRBs chainDB + old <- readTVar lastAppliedPendingEBs + when (new == old) retry + writeTVar lastAppliedPendingEBs new + pure (Map.difference new old, Map.difference old new) + MVar.modifyMVar_ getLeiosOutstanding $ \outstanding -> do + let bodies0 = Leios.missingEbBodies outstanding + -- Add entries for newly-pending EBs at their announced + -- size; never overwrite an existing (offer-supplied) size. + bodies1 = + Map.foldlWithKey' + (\acc point sz -> Map.insertWith (\_new old' -> old') point sz acc) + bodies0 + added + -- Drop entries we no longer need. + bodies2 = bodies1 `Map.difference` removed + pure outstanding{Leios.missingEbBodies = bodies2} + void $ MVar.tryPutMVar getLeiosReady () + -- The Leios fetch logic: 0.5s loop that takes 'getLeiosReady', reads -- the peers' offerings, runs 'leiosFetchLogicIteration' to decide what -- to fetch next from each peer, and pushes the resulting requests onto @@ -466,6 +501,32 @@ initNodeKernel iterationStart <- getMonotonicTime leiosPeersVars <- MVar.readMVar getLeiosPeersVars offerings <- mapM (MVar.readMVar . Leios.offerings) leiosPeersVars + -- Fallback peer source: the certified-EB hashes implied by each + -- peer's ChainSync candidate fragment. A peer whose candidate + -- contains a CertRB has validated that EB's closure locally, so it + -- can serve a closure that no peer has explicitly offered (the + -- late-join case). A CertRB certifies the EB announced by its + -- immediate predecessor, so we track the running announcement while + -- folding the fragment oldest-to-newest. + candidateCertEbs <- atomically $ do + handles <- cschcMap varChainSyncHandles + fmap (Map.mapKeysMonotonic Leios.MkPeerId) $ + forM handles $ \csHandle -> do + csState <- readTVar (cschState csHandle) + let step (prevAnn, ebHashes) hwt = + let hdr = hwtHeader hwt + ebHashes' + | CertRB <- headerIsCertRB hdr + , Just ebHash <- prevAnn = + Set.insert ebHash ebHashes + | otherwise = ebHashes + in (ebAnnouncementHash <$> headerEbAnnouncement hdr, ebHashes') + pure $ + snd $ + foldl' + step + (Nothing :: Maybe EbHash, Set.empty) + (AF.toOldestFirst (csCandidate csState)) newDecisions <- MVar.modifyMVar getLeiosOutstanding $ \outstanding -> do filteredOutstanding <- Leios.filterMissingWork leiosConn outstanding traceWith leiosTr $ MkTraceLeiosKernel "leiosFetchLogic: filtered" @@ -473,6 +534,7 @@ initNodeKernel Leios.leiosFetchLogicIteration Leios.demoLeiosFetchStaticEnv offerings + candidateCertEbs filteredOutstanding pure (outstanding', decisions) traceWith leiosTr $ MkTraceLeiosKernel "leiosFetchLogic: decided" diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/HardFork/Combinator.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/HardFork/Combinator.hs index cc24e6dcab..a89cf43b2e 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/HardFork/Combinator.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/HardFork/Combinator.hs @@ -56,7 +56,7 @@ import Ouroboros.Consensus.Protocol.LeaderSchedule ( LeaderSchedule (..) , leaderScheduleFor ) -import Ouroboros.Consensus.Storage.LedgerDB (ResolveLeiosBlock) +import Ouroboros.Consensus.Storage.LedgerDB (IsCertRB (..), ResolveLeiosBlock (..)) import Ouroboros.Consensus.TypeFamilyWrappers import Ouroboros.Consensus.Util.IndexedMemPack import Ouroboros.Consensus.Util.Orphans () @@ -435,9 +435,15 @@ instance HasHardForkTxOut '[BlockA, BlockB] where type TestBlock = HardForkBlock '[BlockA, BlockB] -instance ResolveLeiosBlock BlockA -instance ResolveLeiosBlock BlockB -instance ResolveLeiosBlock TestBlock +instance ResolveLeiosBlock BlockA where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing +instance ResolveLeiosBlock BlockB where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing +instance ResolveLeiosBlock TestBlock where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing instance CanHardFork '[BlockA, BlockB] where type HardForkTxMeasure '[BlockA, BlockB] = IgnoringOverflow ByteSize32 diff --git a/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Header.hs b/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Header.hs index 70da377eef..617b1a7d1f 100644 --- a/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Header.hs +++ b/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Header.hs @@ -7,6 +7,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE ViewPatterns #-} @@ -79,10 +80,10 @@ import Cardano.Protocol.TPraos.BHeader (PrevHash) import Cardano.Protocol.TPraos.OCert (OCert) import Cardano.Slotting.Block (BlockNo) import Cardano.Slotting.Slot (SlotNo) -import Data.Maybe.Strict (StrictMaybe (..)) +import Data.Maybe.Strict (StrictMaybe (..), maybeToStrictMaybe, strictMaybeToMaybe) import Data.Word (Word32) import GHC.Generics (Generic) -import LeiosDemoTypes (EbAnnouncement) +import LeiosDemoTypes (EbAnnouncement, IsCertRB (..)) import NoThunks.Class (NoThunks (..)) import Ouroboros.Consensus.Protocol.Praos.VRF (InputVRF) @@ -113,6 +114,9 @@ data HeaderBody crypto = HeaderBody -- ^ Optional Leios endorser-block announcement (Dijkstra-only; -- 'SNothing' on earlier eras). Placed on the Praos header for -- early-diffusion of EB references before the body arrives. + , hbIsCertRB :: !IsCertRB + -- ^ Stub for the CIP-0164 header bit signalling that this RB + -- certifies a previously-announced EB. } deriving Generic @@ -182,7 +186,9 @@ headerHash = extractHash . hashAnnotated -- Serialisation -------------------------------------------------------------------------------- --- | 10-field encoding when 'SNothing' (pre-Leios-compatible), 11 when 'SJust'. +-- | Canonical 12-field encoding carrying @(IsCertRB, Maybe EbAnnouncement)@. +-- Decode also accepts len=10 (pre-Leios) and len=11 (announcement-only) for +-- back-compat with existing on-disk data, but those shapes are never emitted. instance Crypto crypto => EncCBOR (HeaderBody crypto) where encCBOR HeaderBody @@ -197,24 +203,28 @@ instance Crypto crypto => EncCBOR (HeaderBody crypto) where , hbOCert , hbProtVer , hbLeiosEbAnnouncement + , hbIsCertRB } = - let (len, encEbAnnouncement) = case hbLeiosEbAnnouncement of - SNothing -> (10 :: Word, mempty) - SJust ebAnnouncement -> (11, encCBOR ebAnnouncement) - in mconcat - [ encodeListLen len - , encCBOR hbBlockNo - , encCBOR hbSlotNo - , encCBOR hbPrev - , encCBOR hbVk - , encodeVerKeyVRF hbVrfVk - , encCBOR hbVrfRes - , encCBOR hbBodySize - , encCBOR hbBodyHash - , encCBOR hbOCert - , encCBOR hbProtVer - , encEbAnnouncement - ] + -- Canonical encoding: len=12 always, carrying @(Bool, Maybe + -- EbAnnouncement)@. Pre-Leios headers (len=10, no + -- announcement) and announcement-only headers (len=11) are + -- still accepted on decode for back-compat with existing + -- on-disk data, but never emitted. + mconcat + [ encodeListLen 12 + , encCBOR hbBlockNo + , encCBOR hbSlotNo + , encCBOR hbPrev + , encCBOR hbVk + , encodeVerKeyVRF hbVrfVk + , encCBOR hbVrfRes + , encCBOR hbBodySize + , encCBOR hbBodyHash + , encCBOR hbOCert + , encCBOR hbProtVer + , encCBOR hbIsCertRB + , encCBOR (strictMaybeToMaybe hbLeiosEbAnnouncement) + ] instance Crypto crypto => DecCBOR (HeaderBody crypto) where decCBOR = do @@ -229,9 +239,18 @@ instance Crypto crypto => DecCBOR (HeaderBody crypto) where hbBodyHash <- decCBOR hbOCert <- unCBORGroup <$> decCBOR hbProtVer <- decCBOR - hbLeiosEbAnnouncement <- case len of - 10 -> pure SNothing - 11 -> SJust <$> decCBOR + -- Canonical: len=12 with @(Bool, Maybe EbAnnouncement)@. + -- len=10 (pre-Leios) and len=11 (announcement-only, no CertRB + -- bit) are accepted for back-compat but no longer emitted. + (hbLeiosEbAnnouncement, hbIsCertRB) <- case len of + 10 -> pure (SNothing, NotCertRB) + 11 -> do + ebAnn <- decCBOR @EbAnnouncement + pure (SJust ebAnn, NotCertRB) + 12 -> do + isCertRB <- decCBOR @IsCertRB + mayEbAnn <- decCBOR @(Maybe EbAnnouncement) + pure (maybeToStrictMaybe mayEbAnn, isCertRB) _ -> fail $ "Praos HeaderBody CBOR has wrong length: " <> show len pure HeaderBody @@ -246,6 +265,7 @@ instance Crypto crypto => DecCBOR (HeaderBody crypto) where , hbOCert , hbProtVer , hbLeiosEbAnnouncement + , hbIsCertRB } encodeHeaderRaw :: diff --git a/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Views.hs b/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Views.hs index a8e1af5be2..50f30f19fd 100644 --- a/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Views.hs +++ b/ouroboros-consensus-protocol/src/ouroboros-consensus-protocol/Ouroboros/Consensus/Protocol/Praos/Views.hs @@ -15,6 +15,7 @@ import Cardano.Protocol.TPraos.BHeader (PrevHash) import Cardano.Protocol.TPraos.OCert (OCert) import Cardano.Slotting.Slot (SlotNo) import Data.Word (Word16, Word32) +import LeiosDemoTypes (IsCertRB) import Ouroboros.Consensus.Protocol.Praos.Header (HeaderBody) import Ouroboros.Consensus.Protocol.Praos.VRF (InputVRF) @@ -36,6 +37,10 @@ data HeaderView crypto = HeaderView -- ^ Header which must be signed , hvSignature :: !(SignedKES (KES crypto) (HeaderBody crypto)) -- ^ KES Signature of the header + , hvIsCertRB :: !IsCertRB + -- ^ Stub for the CIP-0164 header bit signalling that this RB + -- certifies a previously-announced EB. Lifted onto 'HeaderView' + -- so envelope checks can read it without unpacking the body. } data LedgerView = LedgerView diff --git a/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Consensus/Protocol/Serialisation/Generators.hs b/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Consensus/Protocol/Serialisation/Generators.hs index 29e06d6ff2..8089a25c9a 100644 --- a/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Consensus/Protocol/Serialisation/Generators.hs +++ b/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Consensus/Protocol/Serialisation/Generators.hs @@ -22,6 +22,7 @@ import qualified Data.ByteString as BS import LeiosDemoTypes ( EbAnnouncement (EbAnnouncement) , EbHash (MkEbHash) + , IsCertRB (..) ) import Ouroboros.Consensus.Protocol.Praos (PraosState (PraosState)) import qualified Ouroboros.Consensus.Protocol.Praos as Praos @@ -33,7 +34,7 @@ import Ouroboros.Consensus.Protocol.Praos.VRF (InputVRF, mkInputVRF) import Test.Cardano.Ledger.Shelley.Serialisation.EraIndepGenerators () import Test.Cardano.StrictContainers.Instances () import Test.Crypto.KES () -import Test.QuickCheck (Arbitrary (..), Gen, choose, oneof) +import Test.QuickCheck (Arbitrary (..), Gen, choose, elements, oneof) instance Arbitrary EbHash where arbitrary = MkEbHash . BS.pack <$> vectorOfWord8 32 @@ -74,6 +75,10 @@ instance Praos.PraosCrypto c => Arbitrary (HeaderBody c) where <*> ocert <*> arbitrary <*> arbitrary + <*> arbitrary + +instance Arbitrary IsCertRB where + arbitrary = elements [NotCertRB, CertRB] instance Praos.PraosCrypto c => Arbitrary (Header c) where arbitrary = do diff --git a/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Ouroboros/Consensus/Protocol/Praos/Header.hs b/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Ouroboros/Consensus/Protocol/Praos/Header.hs index ca923c950e..d318a0027a 100644 --- a/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Ouroboros/Consensus/Protocol/Praos/Header.hs +++ b/ouroboros-consensus-protocol/src/unstable-protocol-testlib/Test/Ouroboros/Consensus/Protocol/Praos/Header.hs @@ -97,6 +97,7 @@ import Data.Ratio ((%)) import Data.Text.Encoding (decodeUtf8, encodeUtf8) import Data.Word (Word64) import GHC.Generics (Generic) +import LeiosDemoTypes (IsCertRB (..)) import Ouroboros.Consensus.Protocol.Praos (PraosValidationErr (..)) import Ouroboros.Consensus.Protocol.Praos.Header ( Header @@ -448,6 +449,7 @@ genHeaderBody context = do (hbOCert, kesPeriod) <- genCert hbSlotNo context let hbProtVer = protocolVersionZero hbLeiosEbAnnouncement = SNothing + hbIsCertRB = NotCertRB headerBody = HeaderBody{..} pure $ (headerBody, kesPeriod) where diff --git a/ouroboros-consensus/bench/leios-db-bench/Main.hs b/ouroboros-consensus/bench/leios-db-bench/Main.hs index 0e63e3000b..50ffff875a 100644 --- a/ouroboros-consensus/bench/leios-db-bench/Main.hs +++ b/ouroboros-consensus/bench/leios-db-bench/Main.hs @@ -231,7 +231,7 @@ insertOneEb conn ebIdx = do , let h = genTxHash ebIdx txIdx ] leiosDbInsertEbPoint conn point (leiosEbBytesSize eb) - leiosDbInsertEbBody conn point eb + _ <- leiosDbInsertEbBody conn point eb _ <- leiosDbInsertTxs conn txs pure () diff --git a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/Common.hs b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/Common.hs index 37ab7e2e5c..784658e6fb 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/Common.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/Common.hs @@ -14,6 +14,7 @@ import Cardano.Slotting.Slot (SlotNo) import Control.Concurrent.Class.MonadSTM.Strict (StrictTChan) import Data.ByteString (ByteString) import Data.Map.Strict (Map) +import Data.Set (Set) import GHC.Stack (HasCallStack) import LeiosDemoTypes ( BytesSize @@ -38,6 +39,17 @@ data LeiosDbHandle m = LeiosDbHandle -- TODO: make return type more descriptive (e.g. Subscription { getNext :: STM m LeiosEbNotification }) , open :: m (LeiosDbConnection m) -- ^ Open a new connection to the LeiosDb. + , readCompletedClosures :: HasCallStack => m (Set EbHash) + -- ^ EB hashes for which the corresponding EB closure is complete: + -- the body is stored locally and every referenced tx is present. + -- Backed by a cache the handle seeds at construction and updates + -- inside the insert paths, so the read is O(1) on the ChainSel hot + -- path. + -- + -- TODO: cap the cache. Only EBs within @k@ of the immutable tip can + -- be referenced by a candidate chain, so the cache only needs to hold + -- that window; older entries can be evicted and answered by a DB + -- query on miss. } data LeiosEbNotification @@ -61,13 +73,19 @@ data LeiosDbConnection m = LeiosDbConnection -- - Missing EB bodies: EBs in ebs with NULL missingTxCount -- - Missing TXs: TXs in ebTxs without entries in txs -- NOTE: This is O(n) and should only be used at startup for initialization. - , -- NOTE: yields a LeiosOfferBlock notification - leiosDbInsertEbBody :: HasCallStack => LeiosPoint -> LeiosEb -> m () + , -- NOTE: yields an 'AcquiredEb' notification. Additionally, for + -- every EB whose closure has become complete via this body insert + -- (i.e. all referenced txs are already present, which can happen + -- when two EBs reference the same 'TxHash'es and the txs were + -- inserted in service of the other EB), yields an 'AcquiredEbTxs' + -- notification and returns the completed 'LeiosPoint's. Mirrors + -- the 'CompletedEbs' return of 'leiosDbInsertTxs'. + leiosDbInsertEbBody :: HasCallStack => LeiosPoint -> LeiosEb -> m CompletedEbs , -- TODO: Take [LeiosTx] and hash on insert? leiosDbInsertTxs :: HasCallStack => [(TxHash, ByteString)] -> m CompletedEbs -- ^ Insert transactions into the global txs table (INSERT OR IGNORE). -- After inserting, checks which EBs referencing these txs are now complete - -- and emits LeiosOfferBlockTxs notifications for each. + -- and emits 'AcquiredEbTxs' notifications for each. -- -- NOTE: Duplicate notifications may be emitted if the same EB becomes -- complete via multiple insert batches (e.g., if txs are inserted twice). diff --git a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/InMemory.hs b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/InMemory.hs index 66d4097d34..ded387bc99 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/InMemory.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/InMemory.hs @@ -21,14 +21,18 @@ import Control.Concurrent.Class.MonadSTM.Strict , newBroadcastTChan , newTVarIO , readTVar + , readTVarIO , writeTChan ) +import Control.Monad (unless) import Data.ByteString (ByteString) import qualified Data.ByteString as BS import Data.IntMap.Strict (IntMap) import qualified Data.IntMap.Strict as IntMap import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map +import Data.Set (Set) +import qualified Data.Set as Set import LeiosDemoDb.Common ( CompletedEbs , LeiosDbConnection (..) @@ -85,6 +89,13 @@ newLeiosDBInMemory = do newLeiosDBInMemoryWith :: IOLike m => StrictTVar m InMemoryLeiosDb -> m (LeiosDbHandle m) newLeiosDBInMemoryWith stateVar = do notificationChan <- atomically newBroadcastTChan + -- Seed from whatever state the caller's 'stateVar' holds at + -- construction. Empty for 'newLeiosDBInMemory'; tests that inject + -- pre-populated state via 'newLeiosDBInMemoryWith' rely on the + -- cache reflecting it. + closuresVar <- do + initial <- imComputeCompletedClosures <$> readTVarIO stateVar + newTVarIO initial pure $ LeiosDbHandle { subscribeEbNotifications = @@ -97,8 +108,8 @@ newLeiosDBInMemoryWith stateVar = do , leiosDbLookupEbPoint = imLookupEbPoint stateVar , leiosDbInsertEbPoint = imInsertEbPoint stateVar , leiosDbLookupEbBody = imLookupEbBody stateVar - , leiosDbInsertEbBody = imInsertEbBody stateVar notificationChan - , leiosDbInsertTxs = imInsertTxs stateVar notificationChan + , leiosDbInsertEbBody = imInsertEbBody stateVar notificationChan closuresVar + , leiosDbInsertTxs = imInsertTxs stateVar notificationChan closuresVar , leiosDbBatchRetrieveTxs = imBatchRetrieveTxs stateVar , leiosDbFilterMissingEbBodies = imFilterMissingEbBodies stateVar , leiosDbFilterMissingTxs = imFilterMissingTxs stateVar @@ -106,8 +117,21 @@ newLeiosDBInMemoryWith stateVar = do , leiosDbQueryCompletedEbByPoint = imQueryCompletedEbByPoint stateVar , leiosDbQueryCertificateByPoint = return . Just . trustNoVerifyLeiosCertificate } + , readCompletedClosures = readTVarIO closuresVar } +-- | EB hashes whose closure is complete in the given in-memory state: +-- body stored and every referenced tx present in 'imTxs'. Mirrors the +-- predicate inside 'imInsertEbBody' and 'imInsertTxs'; pulled out so +-- the seed and the per-insert update use the same definition. +imComputeCompletedClosures :: InMemoryLeiosDb -> Set EbHash +imComputeCompletedClosures st = + Set.fromList + [ ebHash + | (ebHash, entries) <- Map.toList (imEbBodies st) + , all (\e -> Map.member (eteTxHash e) (imTxs st)) (IntMap.elems entries) + ] + -- * Top-level implementations imScanEbPoints :: IOLike m => StrictTVar m InMemoryLeiosDb -> m [(SlotNo, EbHash)] @@ -153,10 +177,11 @@ imInsertEbBody :: IOLike m => StrictTVar m InMemoryLeiosDb -> StrictTChan m LeiosEbNotification -> + StrictTVar m (Set EbHash) -> LeiosPoint -> LeiosEb -> - m () -imInsertEbBody stateVar notificationChan point eb = do + m CompletedEbs +imInsertEbBody stateVar notificationChan closuresVar point eb = do let items = leiosEbBodyItems eb when (null items) $ error "leiosDbInsertEbBody: empty EB body (programmer error)" @@ -177,14 +202,27 @@ imInsertEbBody stateVar notificationChan point eb = do , imEbSlots = Map.insert point.pointEbHash point.pointSlotNo (imEbSlots s) } writeTChan notificationChan $ AcquiredEb point (leiosEbBytesSize eb) + -- A body insert can complete at most its own EB's closure, and only + -- when all of its txs are already present. See the haddock of + -- 'leiosDbInsertEbBody' for the scenario. + state <- readTVar stateVar + let allTxsPresent = + all (\e -> Map.member (eteTxHash e) (imTxs state)) (IntMap.elems entries) + if allTxsPresent + then do + writeTChan notificationChan $ AcquiredEbTxs point + modifyTVar closuresVar (Set.insert point.pointEbHash) + pure [point] + else pure [] imInsertTxs :: IOLike m => StrictTVar m InMemoryLeiosDb -> StrictTChan m LeiosEbNotification -> + StrictTVar m (Set EbHash) -> [(TxHash, ByteString)] -> m CompletedEbs -imInsertTxs stateVar notificationChan txs = atomically $ do +imInsertTxs stateVar notificationChan closuresVar txs = atomically $ do let insertedTxHashes = [txHash | (txHash, _) <- txs] forM_ txs $ \(txHash, txBytes) -> do let txBytesSize = fromIntegral $ BS.length txBytes @@ -201,6 +239,9 @@ imInsertTxs stateVar notificationChan txs = atomically $ do , slot <- maybeToList $ Map.lookup ebHash (imEbSlots state) ] forM_ completed $ writeTChan notificationChan . AcquiredEbTxs + unless (null completed) $ + modifyTVar closuresVar $ + Set.union (Set.fromList (map pointEbHash completed)) pure completed imBatchRetrieveTxs :: diff --git a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/SQLite.hs b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/SQLite.hs index bd800a538c..c0a3d4d71c 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/SQLite.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoDb/SQLite.hs @@ -20,12 +20,16 @@ import Cardano.Slotting.Slot (SlotNo (..)) import Control.Concurrent (threadDelay) import Control.Concurrent.Class.MonadSTM.Strict ( StrictTChan + , StrictTVar , dupTChan + , modifyTVar , newBroadcastTChan + , newTVarIO + , readTVarIO , writeTChan ) import Control.Exception (throwIO) -import Control.Monad (void) +import Control.Monad (unless, void) import Control.Monad.Class.MonadThrow ( bracket , generalBracket @@ -36,6 +40,8 @@ import Data.ByteString (ByteString) import qualified Data.ByteString as BS import Data.Int (Int64) import qualified Data.Map.Strict as Map +import Data.Set (Set) +import qualified Data.Set as Set import Data.String (fromString) import Database.SQLite3 ( SQLOpenFlag (..) @@ -89,17 +95,32 @@ newLeiosDBSQLiteFromEnv = do newLeiosDBSQLite :: FilePath -> IO (LeiosDbHandle IO) newLeiosDBSQLite dbPath = do notificationChan <- atomically newBroadcastTChan + -- Seed the closure cache from persisted state. Short-lived + -- connection: opened, queried, closed. Also guarantees the schema + -- exists before any later 'open' call. + closuresVar <- do + seedDb <- openSQLiteDb dbPath + initial <- sqlReadCompletedClosures seedDb + void $ DB.close seedDb + newTVarIO initial pure $ LeiosDbHandle { subscribeEbNotifications = atomically (dupTChan notificationChan) - , open = openSQLiteConnection dbPath notificationChan + , open = openSQLiteConnection dbPath notificationChan closuresVar + , readCompletedClosures = readTVarIO closuresVar } -- * Connection management -openSQLiteConnection :: FilePath -> StrictTChan IO LeiosEbNotification -> IO (LeiosDbConnection IO) -openSQLiteConnection dbPath notificationChan = do +-- | Open a connection and bring it to a usable state: pragmas, schema +-- init (on a fresh file), and the in-memory @mem.txPoints@ attach. +-- Shared by 'openSQLiteConnection' (each connection that backs a +-- 'LeiosDbConnection') and by the closure-cache seed in +-- 'newLeiosDBSQLite' (one short-lived connection at handle +-- construction). +openSQLiteDb :: FilePath -> IO DB.Database +openSQLiteDb dbPath = do shouldInitSchema <- not <$> doesFileExist dbPath db <- open2 (fromString dbPath) [SQLOpenReadWrite, SQLOpenCreate] SQLVFSDefault traverse_ (dbExec db) $ @@ -111,6 +132,15 @@ openSQLiteConnection dbPath notificationChan = do when shouldInitSchema $ dbExec db (fromString sql_schema) dbExec db (fromString sql_attach_memTxPoints) + pure db + +openSQLiteConnection :: + FilePath -> + StrictTChan IO LeiosEbNotification -> + StrictTVar IO (Set EbHash) -> + IO (LeiosDbConnection IO) +openSQLiteConnection dbPath notificationChan closuresVar = do + db <- openSQLiteDb dbPath let notify = atomically . writeTChan notificationChan pure $ LeiosDbConnection @@ -119,8 +149,8 @@ openSQLiteConnection dbPath notificationChan = do , leiosDbLookupEbPoint = sqlLookupEbPoint db , leiosDbInsertEbPoint = sqlInsertEbPoint db , leiosDbLookupEbBody = sqlLookupEbBody db - , leiosDbInsertEbBody = sqlInsertEbBody db notify - , leiosDbInsertTxs = sqlInsertTxs db notify + , leiosDbInsertEbBody = sqlInsertEbBody db notify closuresVar + , leiosDbInsertTxs = sqlInsertTxs db notify closuresVar , leiosDbBatchRetrieveTxs = sqlBatchRetrieveTxs db , leiosDbFilterMissingEbBodies = sqlFilterMissingEbBodies db , leiosDbFilterMissingTxs = sqlFilterMissingTxs db @@ -174,12 +204,18 @@ sqlLookupEbBody db ebHash = loop ((txHash, size) : acc) loop [] -sqlInsertEbBody :: DB.Database -> (LeiosEbNotification -> IO ()) -> LeiosPoint -> LeiosEb -> IO () -sqlInsertEbBody db notify point eb = do +sqlInsertEbBody :: + DB.Database -> + (LeiosEbNotification -> IO ()) -> + StrictTVar IO (Set EbHash) -> + LeiosPoint -> + LeiosEb -> + IO CompletedEbs +sqlInsertEbBody db notify closuresVar point eb = do let items = leiosEbBodyItems eb when (null items) $ error "leiosDbInsertEbBody: empty EB body (programmer error)" - dbWithBEGIN db $ do + completed <- dbWithBEGIN db $ do dbWithPrepare db (fromString sql_insert_ebBody) $ \stmt -> mapM_ ( \(txOffset, txHash, txBytesSize) -> do @@ -197,11 +233,22 @@ sqlInsertEbBody db notify point eb = do dbBindBlob stmt 2 point.pointEbHash.ebHashBytes dbBindInt64 stmt 3 (fromIntegral $ unSlotNo point.pointSlotNo) dbStep1 stmt + -- A body insert can complete an EB's closure on the spot when all its + -- txs happen to be in the DB already (e.g. another EB references the + -- same 'TxHash'es and completed first). Run the same completion + -- finder + mark-notified step that 'sqlInsertTxs' uses. + findAndMarkCompletedEbs db notify $ AcquiredEb point (leiosEbBytesSize eb) + notifyAndCacheCompleted notify closuresVar completed + pure completed sqlInsertTxs :: - DB.Database -> (LeiosEbNotification -> IO ()) -> [(TxHash, ByteString)] -> IO CompletedEbs -sqlInsertTxs db notify txs = do + DB.Database -> + (LeiosEbNotification -> IO ()) -> + StrictTVar IO (Set EbHash) -> + [(TxHash, ByteString)] -> + IO CompletedEbs +sqlInsertTxs db notify closuresVar txs = do completed <- dbWithBEGIN db $ do stmtInsert <- dbPrepare db (fromString sql_insert_tx) stmtDecr <- dbPrepare db (fromString sql_decrement_missing_tx_count) @@ -221,24 +268,70 @@ sqlInsertTxs db notify txs = do dbReset stmtDecr dbFinalize stmtInsert dbFinalize stmtDecr - -- Find newly-complete EBs (missingTxCount reached 0) - completed <- dbWithPrepare db (fromString sql_find_complete_ebs) $ \stmt -> do - let loop acc = - dbStep stmt >>= \case - DB.Done -> pure (reverse acc) - DB.Row -> do - ebHash <- MkEbHash <$> DB.columnBlob stmt 0 - slot <- SlotNo . fromIntegral <$> DB.columnInt64 stmt 1 - loop (MkLeiosPoint slot ebHash : acc) - loop [] - -- Mark them as notified so they are not found again - dbWithPrepare db (fromString sql_mark_notified_ebs) $ \stmt -> - dbStep1 stmt - pure completed - -- Emit notifications for each completed EB - forM_ completed $ notify . AcquiredEbTxs + findAndMarkCompletedEbs db + notifyAndCacheCompleted notify closuresVar completed pure completed +-- | Find every EB whose closure has just transitioned to complete +-- (@missingTxCount = 0@), then mark them as notified so a subsequent +-- call does not return them again. +-- +-- Must run inside a 'dbWithBEGIN' so the SELECT and the UPDATE are +-- atomic: otherwise two concurrent insert paths could both observe +-- the same row as just-completed. +-- +-- TODO: this runs an unindexed scan of @ebs@ on every insert call. +-- See the late-join plan, follow-up "gate findAndMarkCompletedEbs on +-- a cheap pre-check". +findAndMarkCompletedEbs :: DB.Database -> IO CompletedEbs +findAndMarkCompletedEbs db = do + completed <- dbWithPrepare db (fromString sql_find_complete_ebs) $ \stmt -> + let loop acc = + dbStep stmt >>= \case + DB.Done -> pure (reverse acc) + DB.Row -> do + ebHash <- MkEbHash <$> DB.columnBlob stmt 0 + slot <- SlotNo . fromIntegral <$> DB.columnInt64 stmt 1 + loop (MkLeiosPoint slot ebHash : acc) + in loop [] + dbWithPrepare db (fromString sql_mark_notified_ebs) $ \stmt -> + dbStep1 stmt + pure completed + +-- | Emit @AcquiredEbTxs@ notifications and add the closures to the +-- in-process cache. Runs after the insert's COMMIT. A reader between +-- COMMIT and this call sees the pre-update cache: the DB row already +-- has @missingTxCount = -1@, but the cache does not yet contain the +-- EB hash, so ChainSel sees the EB as absent and treats the certifying +-- RB as pending. Conservative direction for ChainSel (more CertRBs +-- filtered, not fewer — perf cost only, self-corrects on the next +-- @chainSelectionForBlock@). +notifyAndCacheCompleted :: + (LeiosEbNotification -> IO ()) -> + StrictTVar IO (Set EbHash) -> + CompletedEbs -> + IO () +notifyAndCacheCompleted notify closuresVar completed = do + forM_ completed $ notify . AcquiredEbTxs + unless (null completed) $ + atomically $ + modifyTVar closuresVar $ + Set.union (Set.fromList (map pointEbHash completed)) + +-- | Snapshot of every EB whose closure is complete. Backs the seed +-- in 'newLeiosDBSQLite'; the cache is then maintained by the insert +-- paths. +sqlReadCompletedClosures :: DB.Database -> IO (Set EbHash) +sqlReadCompletedClosures db = + dbWithPrepare db (fromString sql_select_completed_closures) $ \stmt -> + let go acc = + dbStep stmt >>= \case + DB.Done -> pure acc + DB.Row -> do + h <- MkEbHash <$> DB.columnBlob stmt 0 + go (Set.insert h acc) + in go Set.empty + sqlBatchRetrieveTxs :: DB.Database -> EbHash -> [Int] -> IO [(Int, TxHash, Maybe ByteString)] sqlBatchRetrieveTxs db ebHash offsets = dbWithBEGIN db $ do @@ -495,6 +588,14 @@ sql_mark_notified_ebs :: String sql_mark_notified_ebs = "UPDATE ebs SET missingTxCount = -1 WHERE missingTxCount = 0" +-- | Every EB whose closure is complete in the persisted state: body +-- stored locally and every referenced tx present. Covers both +-- @missingTxCount = 0@ (just completed, unnotified) and @-1@ (notified); +-- both states mean the closure is in the DB. +sql_select_completed_closures :: String +sql_select_completed_closures = + "SELECT ebHashBytes FROM ebs WHERE missingTxCount IS NOT NULL AND missingTxCount <= 0" + -- | Decrement missingTxCount for all EBs referencing the given txHash. -- Parameter 1: txHashBytes sql_decrement_missing_tx_count :: String diff --git a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoLogic.hs b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoLogic.hs index bba89278b8..07284b29e8 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoLogic.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoLogic.hs @@ -262,9 +262,16 @@ leiosFetchLogicIteration :: Ord pid => LeiosFetchStaticEnv -> Map (PeerId pid) (Set EbHash, Set EbHash) -> + -- | Per-peer certified EB hashes inferred from ChainSync candidate + -- fragments. Used as a fallback peer source when no peer has offered + -- the EB body (e.g. for CertRBs whose closure pre-dates the local + -- node, as happens on a late join). A peer whose candidate chain + -- contains a CertRB for an EB must hold that EB's full closure, so it + -- can serve both the body and the txs. + Map (PeerId pid) (Set EbHash) -> LeiosOutstanding pid -> (LeiosOutstanding pid, LeiosFetchDecisions pid) -leiosFetchLogicIteration env offerings = +leiosFetchLogicIteration env offerings candidateCertEbs = \acc -> go1 acc emptyLeiosFetchDecisions $ expand $ @@ -330,17 +337,25 @@ leiosFetchLogicIteration env offerings = choosePeerEb :: Set (PeerId pid) -> LeiosOutstanding pid -> EbHash -> Maybe (PeerId pid) choosePeerEb peerIds acc ebHash = - foldr (\a _ -> Just a) Nothing $ - [ peerId - | (peerId, (ebHashes, _ebHashes)) <- - Map.toList $ -- TODO prioritize/shuffle? - (`Map.withoutKeys` peerIds) $ -- not already requested from this peer - offerings - , Map.findWithDefault 0 peerId (Leios.requestedBytesSizePerPeer acc) - <= Leios.maxRequestedBytesSizePerPeer env - , -- peer can be sent more requests - ebHash `Set.member` ebHashes -- peer has offered this EB body - ] + case pickFrom (Map.map fst offerings) of + Just peerId -> Just peerId + -- No peer has offered this EB body; fall back to peers whose + -- ChainSync candidate fragment includes a CertRB for this EB. + Nothing -> pickFrom candidateCertEbs + where + pickFrom :: Map (PeerId pid) (Set EbHash) -> Maybe (PeerId pid) + pickFrom source = + foldr (\a _ -> Just a) Nothing $ + [ peerId + | (peerId, ebHashes) <- + Map.toList $ -- TODO prioritize/shuffle? + (`Map.withoutKeys` peerIds) $ -- not already requested from this peer + source + , Map.findWithDefault 0 peerId (Leios.requestedBytesSizePerPeer acc) + <= Leios.maxRequestedBytesSizePerPeer env + , -- peer can be sent more requests + ebHash `Set.member` ebHashes + ] goTx2 :: LeiosOutstanding pid -> @@ -388,20 +403,29 @@ leiosFetchLogicIteration env offerings = BytesSize -> Maybe (PeerId pid, Map EbHash Int) choosePeerTx peerIds acc txOffsets targetTxBytesSize = - foldr (\a _ -> Just a) Nothing $ - [ (peerId, Map.map fst txOffsets') - | (peerId, (_ebIds, ebIds)) <- - Map.toList $ -- TODO prioritize/shuffle? - (`Map.withoutKeys` peerIds) $ -- not already requested from this peer - offerings - , Map.findWithDefault 0 peerId (Leios.requestedBytesSizePerPeer acc) - <= Leios.maxRequestedBytesSizePerPeer env - , -- peer can be sent more requests - let txOffsets' = txOffsets `Map.restrictKeys` ebIds - , case Map.lookupMax txOffsets' of - Nothing -> False - Just (_ebHash, (_txOffset, txBytesSize)) -> targetTxBytesSize == txBytesSize -- peer has offered at least one EB closure that includes this tx with the same size - ] + case pickFrom (Map.map snd offerings) of + Just hit -> Just hit + -- Same fallback as 'choosePeerEb': a peer whose candidate chain + -- contains a CertRB for this EB validated the full closure locally + -- and therefore also holds its txs. + Nothing -> pickFrom candidateCertEbs + where + pickFrom :: Map (PeerId pid) (Set EbHash) -> Maybe (PeerId pid, Map EbHash Int) + pickFrom source = + foldr (\a _ -> Just a) Nothing $ + [ (peerId, Map.map fst txOffsets') + | (peerId, ebIds) <- + Map.toList $ -- TODO prioritize/shuffle? + (`Map.withoutKeys` peerIds) $ -- not already requested from this peer + source + , Map.findWithDefault 0 peerId (Leios.requestedBytesSizePerPeer acc) + <= Leios.maxRequestedBytesSizePerPeer env + , -- peer can be sent more requests + let txOffsets' = txOffsets `Map.restrictKeys` ebIds + , case Map.lookupMax txOffsets' of + Nothing -> False + Just (_ebHash, (_txOffset, txBytesSize)) -> targetTxBytesSize == txBytesSize + ] packRequests :: LeiosFetchStaticEnv -> @@ -592,8 +616,9 @@ msgLeiosBlock ktracer tracer (outstandingVar, readyVar) db peerId req eb = do -- duplicates, but we should have not even fetched it. -- REVIEW: ^^^^ this should be resolved -- TODO: This was encountered again, but likely because of a race on two fetches. - leiosDbInsertEbBody db point eb + completed <- leiosDbInsertEbBody db point eb traceWith ktracer $ TraceLeiosBlockAcquired point + forM_ completed $ traceWith ktracer . TraceLeiosBlockTxsAcquired -- update NodeKernel state let !outstanding' = if not novel diff --git a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoTypes.hs b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoTypes.hs index ef2c199552..06f66962f1 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoTypes.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/LeiosDemoTypes.hs @@ -19,7 +19,7 @@ module LeiosDemoTypes (module LeiosDemoTypes) where import Cardano.Binary (FromCBOR (fromCBOR), ToCBOR, enforceSize, serialize', toCBOR) import qualified Cardano.Crypto.Hash as Hash -import Cardano.Ledger.Binary (DecCBOR, EncCBOR) +import Cardano.Ledger.Binary (DecCBOR (..), EncCBOR (..)) import Cardano.Ledger.Core (EraTx, Tx, TxLevel (TopTx)) import Cardano.Prelude (NFData, NonEmpty, toList, toString, (&)) import Cardano.Slotting.Slot (SlotNo (SlotNo)) @@ -144,6 +144,23 @@ instance FromCBOR EbAnnouncement where enforceSize "EbAnnouncement" 2 EbAnnouncement <$> decode <*> decode +-- | Whether a Praos RB certifies a previously-announced Leios EB +-- (CIP-0164). Wire format is @Bool@: the 'EncCBOR'/'DecCBOR' +-- instances map @NotCertRB <-> False@ and @CertRB <-> True@ so +-- on-disk and over-the-wire header bytes are unchanged. +data IsCertRB = NotCertRB | CertRB + deriving stock (Eq, Show, Generic) + deriving anyclass NoThunks + +instance EncCBOR IsCertRB where + encCBOR = + encCBOR . \case + NotCertRB -> False + CertRB -> True + +instance DecCBOR IsCertRB where + decCBOR = (\b -> if b then CertRB else NotCertRB) <$> decCBOR + -- * Fetch logic types type BytesSize = Word32 diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Ledger/Dual.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Ledger/Dual.hs index 584fcdeacc..208a6b95fe 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Ledger/Dual.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Ledger/Dual.hs @@ -138,6 +138,9 @@ instance instance (Typeable m, Typeable a) => ResolveLeiosBlock (DualBlock m a) + where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing instance Condense m => Condense (DualBlock m a) where condense = condense . dualBlockMain diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs index 158387eeda..9ea00fe593 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs @@ -81,8 +81,10 @@ module Ouroboros.Consensus.Storage.ChainDB.API import Control.Monad (void) import Control.ResourceRegistry +import Data.Map.Strict (Map) import Data.Typeable (Typeable) import GHC.Generics (Generic) +import LeiosDemoTypes (BytesSize, LeiosPoint) import Ouroboros.Consensus.Block import Ouroboros.Consensus.BlockchainTime.WallClock.Types (WithArrivalTime) import Ouroboros.Consensus.HeaderStateHistory @@ -170,6 +172,12 @@ data ChainDB m blk = ChainDB -- https://github.com/IntersectMBO/ouroboros-consensus/blob/main/docs/website/contents/for-developers/HandlingBlocksFromTheFuture.md#handling-blocks-from-the-future , chainSelAsync :: m (ChainSelectionPromise m) -- ^ Trigger reprocessing of blocks postponed by the LoE. + , getPendingCertRBs :: STM m (Map LeiosPoint BytesSize) + -- ^ EB closures that chain selection is waiting on (a volatile CertRB + -- certifies them but the closure is not yet local), keyed by the EB's + -- 'LeiosPoint' with its announced byte size. The late-join fetch + -- mirrors these into the Leios outstanding set so the missing closures + -- get pulled from peers. , getCurrentChain :: STM m (AnchoredFragment (Header blk)) -- ^ Get the current chain fragment -- diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs index 32f2e6d2f8..6de3c4f1d8 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs @@ -244,6 +244,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do chainSelFuse <- newFuse "chain selection" chainSelQueue <- newChainSelQueue (Args.cdbsBlocksToAddSize cdbSpecificArgs) varChainSelStarvation <- newTVarIO ChainSelStarvationOngoing + varPendingEBs <- newTVarIO Map.empty let env = CDB @@ -270,6 +271,8 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do , cdbLoE = Args.cdbsLoE cdbSpecificArgs , cdbChainSelStarvation = varChainSelStarvation , cdbPerasCertDB = perasCertDB + , cdbLeiosDbHandle = Args.cdbsLeiosDb cdbSpecificArgs + , cdbPendingEBs = varPendingEBs } setGetCurrentChainForLedgerDB $ Query.getCurrentChain env @@ -279,6 +282,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do API.ChainDB { addBlockAsync = getEnv2 h ChainSel.addBlockAsync , chainSelAsync = getEnv h ChainSel.triggerChainSelectionAsync + , getPendingCertRBs = getEnvSTM h (readTVar . cdbPendingEBs) , getCurrentChain = getEnvSTM h Query.getCurrentChain , getCurrentChainWithTime = getEnvSTM h Query.getCurrentChainWithTime , getTipBlock = getEnv h Query.getTipBlock diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs index 60911ad8b4..32f16204de 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs @@ -41,6 +41,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Background , addBlockRunner ) where +import Control.Concurrent.Class.MonadSTM.Strict (readTChan) import Control.Exception (assert) import Control.Monad (forM_, forever, void, when) import Control.Monad.Trans.Class (lift) @@ -56,6 +57,7 @@ import Data.Void (Void) import Data.Word import GHC.Generics (Generic) import GHC.Stack (HasCallStack) +import LeiosDemoDb (subscribeEbNotifications) import Ouroboros.Consensus.Block import Ouroboros.Consensus.HardFork.Abstract import Ouroboros.Consensus.Ledger.Inspect @@ -92,6 +94,7 @@ launchBgTasks :: , BlockSupportsDiffusionPipelining blk , InspectLedger blk , HasHardForkHistory blk + , LedgerDB.ResolveLeiosBlock blk ) => ChainDbEnv m blk -> -- | Number of immutable blocks replayed on ledger DB startup @@ -102,6 +105,10 @@ launchBgTasks cdb@CDB{..} replayed = do launch "ChainDB.addBlockRunner" $ addBlockRunner cdbChainSelFuse cdb + !ebCompletionThread <- + launch "ChainDB.ebCompletionRunner" $ + ebCompletionRunner cdb + ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed !ledgerDbMaintenaceThread <- forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTaskWatcher" $ @@ -121,6 +128,7 @@ launchBgTasks cdb@CDB{..} replayed = do writeTVar cdbKillBgThreads $ sequence_ [ addBlockThread + , ebCompletionThread , cancelThread ledgerDbMaintenaceThread , gcThread , copyToImmutableDBThread @@ -610,6 +618,7 @@ addBlockRunner :: , InspectLedger blk , HasHardForkHistory blk , HasCallStack + , LedgerDB.ResolveLeiosBlock blk ) => Fuse m -> ChainDbEnv m blk -> @@ -655,3 +664,30 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do ) where starvationTracer = Tracer $ traceWith cdbTracer . TraceChainSelStarvationEvent + +-- | Re-run chain selection whenever a Leios EB closure becomes available. +-- +-- 'computeCertRBsWithPendingEbClosures' hides a CertRB from candidate +-- selection while its certified EB closure is missing. Once the closure +-- arrives (via diffusion or the late-join fetch) the filter stops hiding +-- it, but nothing would re-run chain selection for that block. This +-- thread watches the LeiosDB notification stream and, on every +-- acquisition, enqueues a LoE-style reprocess: the previously-deferred +-- CertRB is a successor of a block on the current chain, so the reprocess +-- reconsiders it and it can now be selected. +-- +-- This deliberately re-uses the existing LoE reprocess path rather than +-- tracking pending CertRBs in dedicated state: the filter is recomputed +-- from scratch on every pass, so a blanket re-trigger is enough. +ebCompletionRunner :: + IOLike m => + ChainDbEnv m blk -> + m Void +ebCompletionRunner CDB{..} = do + notifChan <- subscribeEbNotifications cdbLeiosDbHandle + forever $ do + _ <- atomically $ readTChan notifChan + void $ + addReprocessLoEBlocks + (contramap TraceAddBlockEvent cdbTracer) + cdbChainSelQueue diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs index 7a341a2f5a..45715b8ba7 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs @@ -36,6 +36,7 @@ import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.State.Strict import Control.Tracer (Tracer, nullTracer, traceWith) import Data.Bifunctor (first) +import Data.Foldable (Foldable (foldl')) import Data.Function (on) import Data.Functor.Contravariant ((>$<)) import Data.List (sortBy) @@ -49,6 +50,13 @@ import Data.Set (Set) import qualified Data.Set as Set import Data.Traversable (for) import GHC.Stack (HasCallStack) +import LeiosDemoDb (LeiosDbHandle (..)) +import LeiosDemoTypes + ( BytesSize + , EbAnnouncement (ebAnnouncementHash, ebAnnouncementSize) + , EbHash + , LeiosPoint (..) + ) import Ouroboros.Consensus.Block import Ouroboros.Consensus.BlockchainTime.WallClock.Types (WithArrivalTime) import Ouroboros.Consensus.Config @@ -99,6 +107,7 @@ import Ouroboros.Consensus.Util.AnchoredFragment import Ouroboros.Consensus.Util.EarlyExit (exitEarly, withEarlyExit_) import Ouroboros.Consensus.Util.Enclose (encloseWith) import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Consensus.Util.RedundantConstraints (keepRedundantConstraint) import Ouroboros.Consensus.Util.STM (WithFingerprint (..)) import Ouroboros.Network.AnchoredFragment ( Anchor @@ -332,6 +341,7 @@ chainSelSync :: , InspectLedger blk , HasHardForkHistory blk , HasCallStack + , ResolveLeiosBlock blk ) => ChainDbEnv m blk -> ChainSelMessage m blk -> @@ -586,6 +596,7 @@ chainSelectionForBlock :: , InspectLedger blk , HasHardForkHistory blk , HasCallStack + , ResolveLeiosBlock blk ) => ChainDbEnv m blk -> BlockCache blk -> @@ -671,6 +682,7 @@ constructPreferableCandidates :: forall m blk. ( IOLike m , BlockSupportsProtocol blk + , ResolveLeiosBlock blk ) => ChainDbEnv m blk -> PerasWeightSnapshot blk -> @@ -684,12 +696,40 @@ constructPreferableCandidates :: -- preferable to the current chain. m [(ChainDiff (Header blk), ReasonForSwitch' blk)] constructPreferableCandidates CDB{..} weights curChain hdrCache p = do - (succsOf, lookupBlockInfo) <- atomically $ do + (succsOfValid, lookupBlockInfoValid) <- atomically $ do invalid <- forgetFingerprint <$> readTVar cdbInvalid (,) <$> (ignoreInvalidSuc p invalid <$> VolatileDB.filterByPredecessor cdbVolatileDB) <*> (ignoreInvalid p invalid <$> VolatileDB.getBlockInfo cdbVolatileDB) + -- Also hide volatile CertRBs whose certified EB closure has not been + -- downloaded locally, so candidate construction omits the CertRB and + -- (transitively) anything reachable through it; otherwise + -- 'resolveLeiosBlock' would crash on the block-add path. The read is + -- O(1) because the handle caches the snapshot (see + -- 'readCompletedClosures'). + acquiredClosures <- readCompletedClosures cdbLeiosDbHandle + pendingCertRBs <- + computeCertRBsWithPendingEbClosures + cdbVolatileDB + succsOfValid + acquiredClosures + (pointHash (castPoint (AF.anchorPoint curChain) :: Point blk)) + -- Publish the current pending set, keyed by the missing EB's + -- 'LeiosPoint' with that EB's announced byte size, so the late-join + -- fetch ('NodeKernel.pendingEbReconciler') knows which EB closures to + -- pull and with what expected size. + atomically $ writeTVar cdbPendingEBs (Map.map snd pendingCertRBs) + + let + -- Let these two functions ignore invalid blocks and CertRBs whose + -- certified EB closure is still pending. + certRBsWithPendingEbClosures = Set.fromList (fst <$> Map.elems pendingCertRBs) + succsOf = + Set.filter (`Set.notMember` certRBsWithPendingEbClosures) . succsOfValid + lookupBlockInfo = + ignorePendingCertRBs certRBsWithPendingEbClosures lookupBlockInfoValid + loeFrag <- fmap sanitizeLoEFrag <$> cdbLoE traceWith addBlockTracer @@ -1404,3 +1444,84 @@ compareChainDiffs bcfg weights curChain = mkCand = fromMaybe (error "compareChainDiffs: precondition violated") . Diff.apply curChain + +-- | Wrap a @getter@ so it returns 'Nothing' for CertRBs whose certified +-- EB closure has not been downloaded yet. +-- +-- Same shape as 'ignoreInvalid', but generalised to 'Ord h' so the +-- block type does not need to be threaded through a proxy: the body +-- only uses 'Set.member', which needs no more than 'Ord' on the hash. +ignorePendingCertRBs :: + Ord h => + Set h -> + (h -> Maybe a) -> + (h -> Maybe a) +ignorePendingCertRBs pending getter hash + | Set.member hash pending = Nothing + | otherwise = getter hash + +-- | Volatile CertRBs whose certified EB closure is not in the local +-- downloaded-closures tracker, keyed by the missing EB's 'LeiosPoint' +-- (the value is the CertRB's own header hash). Candidate construction +-- hides these blocks so that 'resolveLeiosBlock' is never asked to +-- recover a closure we do not have; the 'LeiosPoint' keys let the +-- late-join fetch ('NodeKernel.pendingEbReconciler') pull exactly the +-- closures still needed. +-- +-- Walks the VolatileDB forward from the immutable tip via 'succsOf', +-- reading each header's Leios fields ('VolatileDB.getLeiosFields', which +-- snapshots 'headerIsCertRB' / 'headerEbAnnouncement' at parse time). A +-- CertRB certifies the EB announced by its /immediate parent/: +-- 'updateChainDepState' (in "Ouroboros.Consensus.Protocol.Praos") +-- overwrites the tracked announcement on every block, so the announcement +-- a CertRB certifies is exactly the one carried by the block before it. +-- The certified EB's 'LeiosPoint' is therefore @(parent slot, parent +-- announcement hash)@, matching the point 'resolveLeiosBlock' queries. +-- We carry the parent's slot+announcement down the walk and flag a CertRB +-- as pending iff that EB's closure is not in the completed-closure +-- snapshot. +computeCertRBsWithPendingEbClosures :: + forall m blk. + (IOLike m, ResolveLeiosBlock blk) => + VolatileDB m blk -> + (ChainHash blk -> Set (HeaderHash blk)) -> + -- | EBs whose closure is locally complete (see + -- 'LeiosDbHandle.readCompletedClosures'). + Set EbHash -> + -- | Where to start the walk. Should be the immutable-tip 'ChainHash'. + ChainHash blk -> + m (Map LeiosPoint (HeaderHash blk, BytesSize)) +computeCertRBsWithPendingEbClosures volDB succsOf acquiredClosures start = do + (leiosFields, blockInfo) <- + atomically $ + (,) + <$> VolatileDB.getLeiosFields volDB + <*> VolatileDB.getBlockInfo volDB + let + go :: + ChainHash blk -> + -- \^ the block (or anchor) whose successors we visit + Maybe (SlotNo, EbAnnouncement) -> + -- \^ that block's slot and EB announcement, if it announced one + Map LeiosPoint (HeaderHash blk, BytesSize) -> + Map LeiosPoint (HeaderHash blk, BytesSize) + go parent parentAnn acc0 = + foldl' visit acc0 (Set.toList (succsOf parent)) + where + visit acc h = go (BlockHash h) thisAnn acc' + where + (isCertRB, mAnn) = fromMaybe (NotCertRB, Nothing) (leiosFields h) + thisAnn = (,) <$> (VolatileDB.biSlotNo <$> blockInfo h) <*> mAnn + acc' + | CertRB <- isCertRB + , Just (parentSlot, ann) <- parentAnn + , let ebHash = ebAnnouncementHash ann + , ebHash `Set.notMember` acquiredClosures = + Map.insert (MkLeiosPoint parentSlot ebHash) (h, ebAnnouncementSize ann) acc + | otherwise = acc + pure $ go start Nothing Map.empty + where + -- The body reads the Leios fields snapshotted by the VolatileDB rather + -- than calling the class methods directly, but the constraint keeps + -- this function tied to block types that actually carry Leios headers. + _ = keepRedundantConstraint (Proxy @(ResolveLeiosBlock blk)) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs index fa375e97cf..28397aa182 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs @@ -95,6 +95,8 @@ import Data.Typeable import Data.Void (Void) import Data.Word (Word64) import GHC.Generics (Generic) +import LeiosDemoDb.Common (LeiosDbHandle) +import LeiosDemoTypes (BytesSize, LeiosPoint) import NoThunks.Class (OnlyCheckWhnfNamed (..)) import Ouroboros.Consensus.Block import Ouroboros.Consensus.BlockchainTime.WallClock.Types (WithArrivalTime) @@ -382,6 +384,17 @@ data ChainDbEnv m blk = CDB -- ^ Information on the last starvation of ChainSel, whether ongoing or -- ended recently. , cdbPerasCertDB :: !(PerasCertDB m blk) + , cdbLeiosDbHandle :: !(LeiosDbHandle m) + -- ^ Exposes the closure cache ('readCompletedClosures'). Held as + -- the handle (not a 'LeiosDbConnection'): the cache is read + -- concurrently and connections are not thread-safe. + , cdbPendingEBs :: !(StrictTVar m (Map LeiosPoint BytesSize)) + -- ^ EB closures that chain selection is currently waiting on because a + -- volatile CertRB certifies them but the closure has not been fetched + -- yet, keyed by the EB's 'LeiosPoint' with its announced byte size. + -- Maintained by ChainSel ('computeCertRBsWithPendingEbClosures') and + -- exposed via 'getPendingCertRBs' so the late-join fetch knows which + -- closures to pull and with what expected size. } deriving Generic diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs index ea699a6d97..c65200f95d 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs @@ -45,6 +45,7 @@ module Ouroboros.Consensus.Storage.LedgerDB.Forker , AnnLedgerError (..) , AnnLedgerError' , ResolveBlock + , IsCertRB (..) , ResolveLeiosBlock (..) , SuccessForkerAction (..) , ValidateArgs (..) @@ -70,7 +71,7 @@ import qualified Data.Set as Set import Data.Word import GHC.Generics import LeiosDemoDb (LeiosDbConnection) -import LeiosDemoTypes (LeiosPoint) +import LeiosDemoTypes (EbAnnouncement, IsCertRB (..), LeiosPoint) import NoThunks.Class import Ouroboros.Consensus.Block import Ouroboros.Consensus.HeaderValidation (headerStateChainDep) @@ -576,6 +577,13 @@ class ResolveLeiosBlock blk where headerLeiosAnnouncement :: Header blk -> Maybe LeiosPoint headerLeiosAnnouncement _ = Nothing + -- | Whether this header denotes a CertRB (a block that certifies a + -- previously-announced Leios EB). + headerIsCertRB :: Header blk -> IsCertRB + + -- | The Leios EB announcement carried by this header, if any. + headerEbAnnouncement :: Header blk -> Maybe EbAnnouncement + {------------------------------------------------------------------------------- Validation -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/API.hs index 85fd1b0086..70a21ef914 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/API.hs @@ -37,6 +37,7 @@ import Data.Typeable (Typeable) import Data.Word (Word16) import GHC.Generics (Generic) import GHC.Stack (HasCallStack) +import LeiosDemoTypes (EbAnnouncement, IsCertRB) import NoThunks.Class (OnlyCheckWhnfNamed (..)) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Storage.Common (BlockComponent (..)) @@ -83,6 +84,16 @@ data VolatileDB m blk = VolatileDB -- ^ Return a function that returns the 'BlockInfo' of the block with -- the given hash or 'Nothing' if the block is not found in the -- VolatileDB. + , getLeiosFields :: + HasCallStack => + STM m (HeaderHash blk -> Maybe (IsCertRB, Maybe EbAnnouncement)) + -- ^ Return a function that returns the per-header Leios fields of the + -- block with the given hash, or 'Nothing' if the block is not found + -- in the VolatileDB. + -- + -- The result pairs 'headerIsCertRB' with 'headerEbAnnouncement' for + -- the same header. Kept separate from 'getBlockInfo' so that + -- 'BlockInfo' stays free of Leios-specific fields. , garbageCollect :: HasCallStack => SlotNo -> m () -- ^ Try to remove all blocks with a slot number less than the given -- one. diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl.hs index 7bf568a933..0ee4dcbde0 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl.hs @@ -139,8 +139,10 @@ import qualified Data.Set as Set import qualified Data.Text as Text import Data.Word (Word64) import GHC.Stack (HasCallStack) +import LeiosDemoTypes (EbAnnouncement, IsCertRB) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Storage.Common (BlockComponent (..)) +import Ouroboros.Consensus.Storage.LedgerDB (ResolveLeiosBlock) import Ouroboros.Consensus.Storage.Serialisation import Ouroboros.Consensus.Storage.VolatileDB.API import Ouroboros.Consensus.Storage.VolatileDB.Impl.FileInfo (FileInfo) @@ -205,6 +207,7 @@ openDB :: ( HasCallStack , IOLike m , GetPrevHash blk + , ResolveLeiosBlock blk , VolatileDbSerialiseConstraints blk ) => Complete VolatileDbArgs m blk -> @@ -237,6 +240,7 @@ openDB VolatileDbArgs{volHasFS = SomeHasFS hasFS, ..} = do , garbageCollect = garbageCollectImpl env , filterByPredecessor = filterByPredecessorImpl env , getBlockInfo = getBlockInfoImpl env + , getLeiosFields = getLeiosFieldsImpl env , getMaxSlotNo = getMaxSlotNoImpl env } return volatileDB @@ -382,6 +386,7 @@ putBlockImpl :: , EncodeDisk blk blk , HasBinaryBlockInfo blk , HasNestedContent Header blk + , ResolveLeiosBlock blk , IOLike m ) => VolatileDBEnv m blk -> @@ -402,6 +407,7 @@ putBlockImpl when fileIsFull $ nextFile hasFS where blockInfo@BlockInfo{biHash, biSlotNo, biPrevHash} = extractBlockInfo blk + leiosFields = extractLeiosFields blk updateStateAfterWrite :: forall h. @@ -429,6 +435,7 @@ putBlockImpl , ibiBlockInfo = blockInfo , ibiNestedCtxt = case unnest (getHeader blk) of DepPair nestedCtxt _ -> SomeSecond nestedCtxt + , ibiLeiosFields = leiosFields } currentRevMap' = Map.insert biHash internalBlockInfo' currentRevMap st' = @@ -561,6 +568,14 @@ getBlockInfoImpl :: getBlockInfoImpl = getterSTM $ \st hash -> ibiBlockInfo <$> Map.lookup hash (currentRevMap st) +getLeiosFieldsImpl :: + forall m blk. + (IOLike m, HasHeader blk) => + VolatileDBEnv m blk -> + STM m (HeaderHash blk -> Maybe (IsCertRB, Maybe EbAnnouncement)) +getLeiosFieldsImpl = getterSTM $ \st hash -> + ibiLeiosFields <$> Map.lookup hash (currentRevMap st) + getMaxSlotNoImpl :: forall m blk. (IOLike m, HasHeader blk) => diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/Parser.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/Parser.hs index fdac506a58..74ab8340bd 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/Parser.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/Parser.hs @@ -11,13 +11,18 @@ module Ouroboros.Consensus.Storage.VolatileDB.Impl.Parser -- * Auxiliary , extractBlockInfo + , extractLeiosFields ) where import qualified Cardano.Ledger.Binary.Plain as Plain import Data.Bifunctor (bimap) import qualified Data.ByteString.Lazy as Lazy import Data.Word (Word64) +import LeiosDemoTypes (EbAnnouncement, IsCertRB) import Ouroboros.Consensus.Block +import Ouroboros.Consensus.Storage.LedgerDB + ( ResolveLeiosBlock (headerEbAnnouncement, headerIsCertRB) + ) import Ouroboros.Consensus.Storage.Serialisation import Ouroboros.Consensus.Storage.VolatileDB.API (BlockInfo (..)) import Ouroboros.Consensus.Storage.VolatileDB.Impl.Types @@ -43,6 +48,7 @@ data ParsedBlockInfo blk = ParsedBlockInfo , pbiBlockSize :: !BlockSize , pbiBlockInfo :: !(BlockInfo blk) , pbiNestedCtxt :: !(SomeSecond (NestedCtxt Header) blk) + , pbiLeiosFields :: !(IsCertRB, Maybe EbAnnouncement) } -- | Parse the given file containing blocks. @@ -56,6 +62,7 @@ parseBlockFile :: , HasBinaryBlockInfo blk , HasNestedContent Header blk , DecodeDisk blk (Lazy.ByteString -> Either Plain.DecoderError blk) + , ResolveLeiosBlock blk ) => CodecConfig blk -> HasFS m h -> @@ -90,6 +97,7 @@ parseBlockFile ccfg hasFS isNotCorrupt validationPolicy fsPath = Right ((offset, (size, blk)), stream') | noValidation || isNotCorrupt blk -> let !blockInfo = extractBlockInfo blk + !leiosFields = extractLeiosFields blk !newParsed = ParsedBlockInfo { pbiBlockOffset = BlockOffset offset @@ -97,6 +105,7 @@ parseBlockFile ccfg hasFS isNotCorrupt validationPolicy fsPath = , pbiBlockInfo = blockInfo , pbiNestedCtxt = case unnest (getHeader blk) of DepPair nestedCtxt _ -> SomeSecond nestedCtxt + , pbiLeiosFields = leiosFields } in checkEntries (newParsed : parsed) stream' | otherwise -> -- The block was invalid @@ -126,3 +135,11 @@ extractBlockInfo blk = } where BinaryBlockInfo{headerOffset, headerSize} = getBinaryBlockInfo blk + +extractLeiosFields :: + (HasHeader blk, GetHeader blk, ResolveLeiosBlock blk) => + blk -> + (IsCertRB, Maybe EbAnnouncement) +extractLeiosFields blk = (headerIsCertRB hdr, headerEbAnnouncement hdr) + where + hdr = getHeader blk diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/State.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/State.hs index a976e19092..36d3ff15b3 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/State.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/State.hs @@ -53,6 +53,7 @@ import Data.Word (Word64) import GHC.Generics (Generic) import GHC.Stack import Ouroboros.Consensus.Block +import Ouroboros.Consensus.Storage.LedgerDB (ResolveLeiosBlock) import Ouroboros.Consensus.Storage.Serialisation import Ouroboros.Consensus.Storage.VolatileDB.API import qualified Ouroboros.Consensus.Storage.VolatileDB.Impl.FileInfo as FileInfo @@ -308,6 +309,7 @@ mkOpenState :: , HasBinaryBlockInfo blk , HasNestedContent Header blk , DecodeDisk blk (Lazy.ByteString -> Either Plain.DecoderError blk) + , ResolveLeiosBlock blk ) => CodecConfig blk -> HasFS m h -> @@ -360,6 +362,7 @@ mkOpenStateHelper :: , HasBinaryBlockInfo blk , HasNestedContent Header blk , DecodeDisk blk (Lazy.ByteString -> Either Plain.DecoderError blk) + , ResolveLeiosBlock blk ) => CodecConfig blk -> HasFS m h -> @@ -496,6 +499,7 @@ addToReverseIndex file = \revMap -> go revMap [] , pbiBlockSize = size , pbiBlockInfo = blockInfo@BlockInfo{biHash} , pbiNestedCtxt = nestedCtxt + , pbiLeiosFields = leiosFields } = parsedBlock internalBlockInfo = InternalBlockInfo @@ -504,6 +508,7 @@ addToReverseIndex file = \revMap -> go revMap [] , ibiBlockSize = size , ibiBlockInfo = blockInfo , ibiNestedCtxt = nestedCtxt + , ibiLeiosFields = leiosFields } -- \| Insert the value at the key returning the updated map, unless there diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/Types.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/Types.hs index 84ed2f6c0d..b529151e09 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/Types.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/VolatileDB/Impl/Types.hs @@ -32,6 +32,7 @@ import Data.Map.Strict (Map) import Data.Set (Set) import Data.Word (Word32, Word64) import GHC.Generics (Generic) +import LeiosDemoTypes (EbAnnouncement, IsCertRB) import NoThunks.Class (NoThunks) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Storage.VolatileDB.API (BlockInfo) @@ -130,5 +131,9 @@ data InternalBlockInfo blk = InternalBlockInfo , ibiBlockSize :: !BlockSize , ibiBlockInfo :: !(BlockInfo blk) , ibiNestedCtxt :: !(SomeSecond (NestedCtxt Header) blk) + , ibiLeiosFields :: !(IsCertRB, Maybe EbAnnouncement) + -- ^ Per-header Leios fields, snapshot once at parse time. Exposed + -- via 'getLeiosFields'. Kept here rather than on 'BlockInfo' so + -- that the public 'BlockInfo' record stays Leios-free. } deriving (Generic, NoThunks) diff --git a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Ouroboros/Storage/TestBlock.hs b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Ouroboros/Storage/TestBlock.hs index aeb9aa9886..ffb50b19bc 100644 --- a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Ouroboros/Storage/TestBlock.hs +++ b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Ouroboros/Storage/TestBlock.hs @@ -145,7 +145,9 @@ data TestBlock = TestBlock deriving anyclass (NFData, NoThunks, Serialise) -- | Default 'ResolveLeiosBlock' — storage TestBlock never carries Leios certs. -instance ResolveLeiosBlock TestBlock +instance ResolveLeiosBlock TestBlock where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing -- | Hash of a 'TestHeader' newtype TestHeaderHash = TestHeaderHash Int diff --git a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/TestBlock.hs b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/TestBlock.hs index c944fa5cf9..25ded904b4 100644 --- a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/TestBlock.hs +++ b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/TestBlock.hs @@ -320,7 +320,9 @@ isStrictDescendentOf b1 b2 = b1 `isDescendentOf` b2 && b1 /= b2 instance ShowProxy TestBlock -- | Default 'ResolveLeiosBlock' — test blocks never carry Leios certificates. -instance Typeable ptype => ResolveLeiosBlock (TestBlockWith ptype) +instance Typeable ptype => ResolveLeiosBlock (TestBlockWith ptype) where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing newtype instance Header (TestBlockWith ptype) = TestHeader {testHeader :: TestBlockWith ptype} diff --git a/ouroboros-consensus/src/unstable-mock-block/Ouroboros/Consensus/Mock/Ledger/Block.hs b/ouroboros-consensus/src/unstable-mock-block/Ouroboros/Consensus/Mock/Ledger/Block.hs index f82493e620..f8efb01e29 100644 --- a/ouroboros-consensus/src/unstable-mock-block/Ouroboros/Consensus/Mock/Ledger/Block.hs +++ b/ouroboros-consensus/src/unstable-mock-block/Ouroboros/Consensus/Mock/Ledger/Block.hs @@ -138,6 +138,9 @@ data SimpleBlock' c ext ext' = SimpleBlock instance (Typeable c, Typeable ext, Typeable ext') => ResolveLeiosBlock (SimpleBlock' c ext ext') + where + headerIsCertRB _ = NotCertRB + headerEbAnnouncement _ = Nothing instance (HashAlgorithm (SimpleHash c), Typeable c, Typeable ext, Serialise ext') => diff --git a/ouroboros-consensus/test/consensus-test/Test/LeiosDemoDb.hs b/ouroboros-consensus/test/consensus-test/Test/LeiosDemoDb.hs index 8c2854cdb0..731fa2ae65 100644 --- a/ouroboros-consensus/test/consensus-test/Test/LeiosDemoDb.hs +++ b/ouroboros-consensus/test/consensus-test/Test/LeiosDemoDb.hs @@ -14,7 +14,7 @@ import Cardano.Slotting.Slot (SlotNo (..)) import Control.Concurrent.Class.MonadSTM.Strict (atomically, readTChan, tryReadTChan) import Control.DeepSeq (force) import Control.Exception (bracket) -import Control.Monad (forM, forM_, replicateM) +import Control.Monad (forM, forM_, replicateM, void) import Control.Monad.Class.MonadTime.SI (diffTime, getMonotonicTime) import qualified Data.ByteString as BS import Data.Function ((&)) @@ -366,7 +366,7 @@ prop_txsInsertThenRetrieve impl = ioProperty $ withFreshDb impl $ \db -> withLeiosDb db $ \con -> do -- Insert the EB first (point then body) leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + _ <- leiosDbInsertEbBody con point eb -- Get the txHashes from the EB for the offsets we want to insert let ebTxList = V.toList (leiosEbTxs eb) !txsToInsert = @@ -426,7 +426,7 @@ test_singleSubscriber db = do eb = mkTestEb 3 withLeiosDb db $ \con -> do leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + void $ leiosDbInsertEbBody con point eb notification <- atomically $ readTChan chan case notification of AcquiredEb notifPoint _ -> @@ -444,7 +444,7 @@ test_multipleSubscribers db = do eb = mkTestEb 5 withLeiosDb db $ \con -> do leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + void $ leiosDbInsertEbBody con point eb -- All subscribers should receive the notification notif1 <- atomically $ readTChan chan1 notif2 <- atomically $ readTChan chan2 @@ -462,7 +462,7 @@ test_correctData db = do expectedSize = leiosEbBytesSize eb withLeiosDb db $ \con -> do leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + void $ leiosDbInsertEbBody con point eb notification <- atomically $ readTChan chan case notification of AcquiredEb notifPoint notifSize -> do @@ -481,7 +481,7 @@ test_lateSubscriber db = do eb1 = mkTestEb 2 withLeiosDb db $ \con -> do leiosDbInsertEbPoint con point1 (leiosEbBytesSize eb1) - leiosDbInsertEbBody con point1 eb1 + void $ leiosDbInsertEbBody con point1 eb1 -- Now subscribe chan <- subscribeEbNotifications db -- The channel should be empty (no past notifications) @@ -494,7 +494,7 @@ test_lateSubscriber db = do eb2 = mkTestEb 3 withLeiosDb db $ \con -> do leiosDbInsertEbPoint con point2 (leiosEbBytesSize eb2) - leiosDbInsertEbBody con point2 eb2 + void $ leiosDbInsertEbBody con point2 eb2 notification <- atomically $ readTChan chan assertOfferBlock point2 notification @@ -511,7 +511,7 @@ test_multipleNotifications db = do withLeiosDb db $ \con -> forM_ (zip points ebs) $ \(point, eb) -> do leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + void $ leiosDbInsertEbBody con point eb -- Read all notifications and verify order notifications <- replicateM 5 (atomically $ readTChan chan) mapM_ @@ -528,8 +528,8 @@ test_noOfferBlockTxsBeforeComplete db = do ebTxList = V.toList (leiosEbTxs eb) withLeiosDb db $ \con -> do leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb - -- Consume the LeiosOfferBlock notification + _ <- leiosDbInsertEbBody con point eb + -- Consume the 'AcquiredEb' notification _ <- atomically $ readTChan chan -- Insert only 2 of 3 txs (by txHash) let txsToInsert = @@ -537,7 +537,7 @@ test_noOfferBlockTxsBeforeComplete db = do | (i, (txHash, _size)) <- zip [0 :: Int, 1] ebTxList ] _ <- leiosDbInsertTxs con txsToInsert - -- No LeiosOfferBlockTxs notification should be available + -- No 'AcquiredEbTxs' notification should be available maybeNotif <- atomically $ tryReadTChan chan case maybeNotif of Nothing -> pure () @@ -554,8 +554,8 @@ test_offerBlockTxs db = do withLeiosDb db $ \con -> do -- Insert the EB (point then body) leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb - -- Consume the LeiosOfferBlock notification + _ <- leiosDbInsertEbBody con point eb + -- Consume the 'AcquiredEb' notification _ <- atomically $ readTChan chan -- Insert all txs (by txHash) let txsToInsert = @@ -578,7 +578,7 @@ test_noReNotifyCompletedEbs db = do withLeiosDb db $ \con -> do -- Insert and complete the EB leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + _ <- leiosDbInsertEbBody con point eb -- Consume the AcquiredEb notification acquiredEb <- atomically $ tryReadTChan chan case acquiredEb of @@ -644,7 +644,7 @@ prop_fetchWorkMissingTxs impl = withLeiosDb db $ \con -> do -- Insert point and body, but no txs leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + _ <- leiosDbInsertEbBody con point eb (work, queryTime) <- timed $ leiosDbQueryFetchWork con let expectedTxCount = numTxs pure $ @@ -676,7 +676,7 @@ prop_fetchWorkCompleteTxs impl = withLeiosDb db $ \con -> do -- Insert point, body, and all txs leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + _ <- leiosDbInsertEbBody con point eb let ebTxList = V.toList (leiosEbTxs eb) txsToInsert = [ (txHash, baseTxBytes) @@ -735,7 +735,7 @@ prop_filterEbBodiesCorrect impl = -- Insert some EBs (those in toInsert will have bodies) forM_ toInsert $ \(point, eb) -> do leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + void $ leiosDbInsertEbBody con point eb -- Also insert points without bodies for the rest let withoutBodies = filter (`notElem` toInsert) pointsAndEbs forM_ withoutBodies $ \(point, eb) -> @@ -794,7 +794,7 @@ prop_completedEbComplete impl = forAllBlind genTxBytes $ \txBytes -> ioProperty $ withFreshDb impl $ \db -> withLeiosDb db $ \con -> do leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + _ <- leiosDbInsertEbBody con point eb let ebTxList = V.toList (leiosEbTxs eb) txsToInsert = [(txHash, txBytes) | (txHash, _size) <- ebTxList] _ <- leiosDbInsertTxs con txsToInsert @@ -824,7 +824,7 @@ prop_completedEbMissingTxs impl = forAllBlind (genPointAndEb numTxs) $ \(point, eb) -> ioProperty $ withFreshDb impl $ \db -> withLeiosDb db $ \con -> do leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + _ <- leiosDbInsertEbBody con point eb (result, queryTime) <- timed $ leiosDbQueryCompletedEbByPoint con point pure $ result === Nothing @@ -840,7 +840,7 @@ prop_completedEbPartialTxs impl = forAllBlind genTxBytes $ \txBytes -> ioProperty $ withFreshDb impl $ \db -> withLeiosDb db $ \con -> do leiosDbInsertEbPoint con point (leiosEbBytesSize eb) - leiosDbInsertEbBody con point eb + _ <- leiosDbInsertEbBody con point eb -- Insert only the first half of txs, leaving at least one missing let ebTxList = V.toList (leiosEbTxs eb) partialTxs = take (numTxs `div` 2) ebTxList diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/VolatileDB/Mock.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/VolatileDB/Mock.hs index b9874c9415..966ea27175 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/VolatileDB/Mock.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/VolatileDB/Mock.hs @@ -42,6 +42,7 @@ openDBMock maxBlocksPerFile ccfg = do , garbageCollect = updateE_ . garbageCollectModel , filterByPredecessor = querySTME $ filterByPredecessorModel , getBlockInfo = querySTME $ getBlockInfoModel + , getLeiosFields = pure (const Nothing) , getMaxSlotNo = querySTME $ getMaxSlotNoModel } where