Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Ouroboros.Network.BlockFetch
Description
Let's start with the big picture...
Key: ┏━━━━━━━━━━━━┓ ╔═════════════╗ ┏━━━━━━━━━━━━━━┓ ╔════════════╗ ┃ STM-based ┃ ║active thread║ ┃state instance┃┓ ║ one thread ║╗ ┃shared state┃ ║ ║ ┃ per peer ┃┃ ║ per peer ║║ ┗━━━━━━━━━━━━┛ ╚═════════════╝ ┗━━━━━━━━━━━━━━┛┃ ╚════════════╝║ ┗━━━━━━━━━━━━━━┛ ╚════════════╝
╔═════════════╗ ┏━━━━━━━━━━━━━┓ ║ Chain sync ║╗ ┃ Ledger ┃ ║ protocol ║║◀───┨ state ┃◀───────────╮ ║(client side)║║ ┃ ┃ │ ╚══════╤══════╝║ ┗━━━━━━━━━━━━━┛ │ ╚═════╪═══════╝ │ ▼ │ ┏━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━┓ ╔══════╧══════╗ ┃ Candidate ┃ ┃ Set of ┃ ║ Chain and ║ ┃ chains ┃ ┃ downloaded ┠────▶║ ledger ║ ┃ (headers) ┃ ┃ blocks ┃ ║ validation ║ ┗━━━━━┯━━━━━━━┛ ┗━━━━━┯━━━━━━━┛ ╚══════╤══════╝ │ │ ▲ │ │ ╭─────────────────╯ │ │ ░░░░░░░░▼░▼░░░░░░░░ │ ▼ ░░╔═════════════╗░░ │ ┏━━━━━━━━━━━━━┓ ╔═════════════╗ ░░║ Block ║░░ │ ┃ Current ┃ ║ Block fetch ║╗ ░░╢ fetch ║◀────────────┼───────────┨ chain ┠────▶║ protocol ║║ ░░║ logic ║░░ │ ┃ (blocks) ┃ ║(server side)║║ ░░╚═════════════╝░░ │ ┠─────────────┨ ╚═════════════╝║ ░░░░░░░░░▲░░░░░░░░░ │ ┃ Tentative ┃ ╚═════════════╝ ░░░░░░░░░▼░░░░░░░░░░░░░░░░░░░░│░░░░░░░░ ┃ chain ┠──╮ ░░┏━━━━━━━━━━━━━┓░░░░░╔═══════╧═════╗░░ ┃ (headers) ┃ │ ╔═════════════╗ ░░┃ Block fetch ┃┓░░░░║ block fetch ║╗░ ┗━━━━━━━━━━━━━┛ │ ║ Chain sync ║╗ ░░┃ state and ┃┃◀──▶║ protocol ║║░ ╰─▶║ protocol ║║ ░░┃ requests ┃┃░░░░║(client side)║║░ ║(server side)║║ ░░┗━━━━━━━━━━━━━┛┃░░░░╚═════════════╝║░ ╚═════════════╝║ ░░░┗━━━━━━━━━━━━━┛░░░░░╚═════════════╝░ ╚═════════════╝ ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
Notes:
- Thread communication is via STM based state.
- Outbound: threads update STM state.
- Inbound: threads wait on STM state changing (using retry).
- These are no queues: there is only the current state, not all change events.
We consider the block fetch logic and the policy for the block fetch protocol client together as one unit of functionality. This is the shaded area in the diagram.
Looking at the diagram we see that these two threads interact with each other and other threads via the following shared state
State | Interactions | Internal/External |
---|---|---|
Candidate chains (headers) | Read | External |
Current chain (blocks) | Read | External |
Set of downloaded blocks | Read & Write | External |
Block fetch requests | Read & Write | Internal |
The block fetch requests state is private between the block fetch logic and the block fetch protocol client, so it is implemented here.
The other state is managed by the consensus layer and is considered external here. So here we define interfaces for interacting with the external state. These have to be provided when instantiating the block fetch logic.
Synopsis
- blockFetchLogic :: forall addr header block m. (HasHeader header, HasHeader block, HeaderHash header ~ HeaderHash block, MonadDelay m, MonadSTM m, Ord addr, Hashable addr) => Tracer m [TraceLabelPeer addr (FetchDecision [Point header])] -> Tracer m (TraceLabelPeer addr (TraceFetchClientState header)) -> BlockFetchConsensusInterface addr header block m -> FetchClientRegistry addr header block m -> BlockFetchConfiguration -> m Void
- data BlockFetchConfiguration = BlockFetchConfiguration {
- bfcMaxConcurrencyBulkSync :: !Word
- bfcMaxConcurrencyDeadline :: !Word
- bfcMaxRequestsInflight :: !Word
- bfcDecisionLoopInterval :: !DiffTime
- bfcSalt :: !Int
- data BlockFetchConsensusInterface peer header block (m :: Type -> Type) = BlockFetchConsensusInterface {
- readCandidateChains :: STM m (Map peer (AnchoredFragment header))
- readCurrentChain :: STM m (AnchoredFragment header)
- readFetchMode :: STM m FetchMode
- readFetchedBlocks :: STM m (Point block -> Bool)
- mkAddFetchedBlock :: WhetherReceivingTentativeBlocks -> STM m (Point block -> block -> m ())
- readFetchedMaxSlotNo :: STM m MaxSlotNo
- plausibleCandidateChain :: HasCallStack => AnchoredFragment header -> AnchoredFragment header -> Bool
- compareCandidateChains :: HasCallStack => AnchoredFragment header -> AnchoredFragment header -> Ordering
- blockFetchSize :: header -> SizeInBytes
- blockMatchesHeader :: header -> block -> Bool
- headerForgeUTCTime :: FromConsensus header -> STM m UTCTime
- blockForgeUTCTime :: FromConsensus block -> STM m UTCTime
- type FetchDecision result = Either FetchDecline result
- data TraceFetchClientState header
- = AddedFetchRequest (FetchRequest header) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header)
- | AcknowledgedFetchRequest (FetchRequest header)
- | SendFetchRequest (AnchoredFragment header) PeerGSV
- | StartedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header)
- | CompletedBlockFetch (Point header) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) NominalDiffTime SizeInBytes
- | CompletedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header)
- | RejectedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header)
- | ClientTerminating Int
- data TraceLabelPeer peerid a = TraceLabelPeer peerid a
- data FetchClientRegistry peer header block m
- newFetchClientRegistry :: MonadSTM m => m (FetchClientRegistry peer header block m)
- bracketFetchClient :: forall m a peer header block version. (MonadSTM m, MonadFork m, MonadMask m, Ord peer) => FetchClientRegistry peer header block m -> version -> (version -> WhetherReceivingTentativeBlocks) -> peer -> (FetchClientContext header block m -> m a) -> m a
- bracketSyncWithFetchClient :: forall m a peer header block. (MonadSTM m, MonadFork m, MonadCatch m, Ord peer) => FetchClientRegistry peer header block m -> peer -> m a -> m a
- bracketKeepAliveClient :: forall m a peer header block. (MonadSTM m, MonadFork m, MonadMask m, Ord peer) => FetchClientRegistry peer header block m -> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
- data FetchMode
- newtype FromConsensus a = FromConsensus {
- unFromConsensus :: a
- data SizeInBytes
- data WhetherReceivingTentativeBlocks
Documentation
blockFetchLogic :: forall addr header block m. (HasHeader header, HasHeader block, HeaderHash header ~ HeaderHash block, MonadDelay m, MonadSTM m, Ord addr, Hashable addr) => Tracer m [TraceLabelPeer addr (FetchDecision [Point header])] -> Tracer m (TraceLabelPeer addr (TraceFetchClientState header)) -> BlockFetchConsensusInterface addr header block m -> FetchClientRegistry addr header block m -> BlockFetchConfiguration -> m Void Source #
Execute the block fetch logic. It monitors the current chain and candidate chains. It decided which block bodies to fetch and manages the process of fetching them, including making alternative decisions based on timeouts and failures.
This runs forever and should be shut down using mechanisms such as async.
data BlockFetchConfiguration Source #
Configuration for FetchDecisionPolicy. Should be determined by external local node config.
Constructors
BlockFetchConfiguration | |
Fields
|
Instances
data BlockFetchConsensusInterface peer header block (m :: Type -> Type) #
Constructors
BlockFetchConsensusInterface | |
Fields
|
Tracer types
type FetchDecision result = Either FetchDecline result Source #
Throughout the decision making process we accumulate reasons to decline to fetch any blocks. This type is used to wrap intermediate and final results.
data TraceFetchClientState header Source #
Tracing types for the various events that change the state
(i.e. FetchClientStateVars
) for a block fetch client.
Note that while these are all state changes, the AddedFetchRequest
occurs
in the decision thread while the other state changes occur in the block
fetch client threads.
Constructors
AddedFetchRequest (FetchRequest header) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) | The block fetch decision thread has added a new fetch instruction consisting of one or more individual request ranges. |
AcknowledgedFetchRequest (FetchRequest header) | Mark the point when the fetch client picks up the request added
by the block fetch decision thread. Note that this event can happen
fewer times than the |
SendFetchRequest (AnchoredFragment header) PeerGSV | Mark the point when fetch request for a fragment is actually sent over the wire. |
StartedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) | Mark the start of receiving a streaming batch of blocks. This will
be followed by one or more |
CompletedBlockFetch (Point header) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) NominalDiffTime SizeInBytes | Mark the completion of of receiving a single block within a streaming batch of blocks. |
CompletedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) | Mark the successful end of receiving a streaming batch of blocks |
RejectedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) | If the other peer rejects our request then we have this event
instead of |
ClientTerminating Int | The client is terminating. Log the number of outstanding requests. |
Instances
(StandardHash header, Show header) => Show (TraceFetchClientState header) Source # | |
Defined in Ouroboros.Network.BlockFetch.ClientState |
data TraceLabelPeer peerid a #
Constructors
TraceLabelPeer peerid a |
Instances
Bifunctor TraceLabelPeer | |
Defined in Network.Mux.Trace Methods bimap :: (a -> b) -> (c -> d) -> TraceLabelPeer a c -> TraceLabelPeer b d Source # first :: (a -> b) -> TraceLabelPeer a c -> TraceLabelPeer b c Source # second :: (b -> c) -> TraceLabelPeer a b -> TraceLabelPeer a c Source # | |
Functor (TraceLabelPeer peerid) | |
Defined in Network.Mux.Trace Methods fmap :: (a -> b) -> TraceLabelPeer peerid a -> TraceLabelPeer peerid b Source # (<$) :: a -> TraceLabelPeer peerid b -> TraceLabelPeer peerid a Source # | |
(Show peerid, Show a) => Show (TraceLabelPeer peerid a) | |
Defined in Network.Mux.Trace | |
(Eq peerid, Eq a) => Eq (TraceLabelPeer peerid a) | |
Defined in Network.Mux.Trace Methods (==) :: TraceLabelPeer peerid a -> TraceLabelPeer peerid a -> Bool Source # (/=) :: TraceLabelPeer peerid a -> TraceLabelPeer peerid a -> Bool Source # |
The FetchClientRegistry
data FetchClientRegistry peer header block m Source #
A registry for the threads that are executing the client side of the
BlockFetch
protocol to communicate with our peers.
The registry contains the shared variables we use to communicate with these threads, both to track their status and to provide instructions.
The threads add/remove themselves to/from this registry when they start up and shut down.
newFetchClientRegistry :: MonadSTM m => m (FetchClientRegistry peer header block m) Source #
Arguments
:: forall m a peer header block version. (MonadSTM m, MonadFork m, MonadMask m, Ord peer) | |
=> FetchClientRegistry peer header block m | |
-> version | |
-> (version -> WhetherReceivingTentativeBlocks) | is pipelining enabled function |
-> peer | |
-> (FetchClientContext header block m -> m a) | |
-> m a |
This is needed to start a block fetch client. It provides the required
FetchClientContext
. It registers and unregisters the fetch client on
start and end.
It also manages synchronisation with the corresponding chain sync client.
bracketSyncWithFetchClient :: forall m a peer header block. (MonadSTM m, MonadFork m, MonadCatch m, Ord peer) => FetchClientRegistry peer header block m -> peer -> m a -> m a Source #
The block fetch and chain sync clients for each peer need to synchronise their startup and shutdown. This bracket operation provides that synchronisation for the chain sync client.
This must be used for the chain sync client outside of its own state registration and deregistration.
bracketKeepAliveClient :: forall m a peer header block. (MonadSTM m, MonadFork m, MonadMask m, Ord peer) => FetchClientRegistry peer header block m -> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a Source #
Re-export types used by BlockFetchConsensusInterface
Constructors
FetchModeBulkSync | |
FetchModeDeadline |
newtype FromConsensus a #
Constructors
FromConsensus | |
Fields
|
Instances
Applicative FromConsensus | |
Defined in Ouroboros.Network.BlockFetch.ConsensusInterface Methods pure :: a -> FromConsensus a Source # (<*>) :: FromConsensus (a -> b) -> FromConsensus a -> FromConsensus b Source # liftA2 :: (a -> b -> c) -> FromConsensus a -> FromConsensus b -> FromConsensus c Source # (*>) :: FromConsensus a -> FromConsensus b -> FromConsensus b Source # (<*) :: FromConsensus a -> FromConsensus b -> FromConsensus a Source # | |
Functor FromConsensus | |
Defined in Ouroboros.Network.BlockFetch.ConsensusInterface Methods fmap :: (a -> b) -> FromConsensus a -> FromConsensus b Source # (<$) :: a -> FromConsensus b -> FromConsensus a Source # |
data SizeInBytes #