Skip to content
186 changes: 186 additions & 0 deletions engine/access/ingestion2/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package ingestion2

import (
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/jobqueue"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)

type Engine struct {
component.Component
cm *component.ComponentManager

log zerolog.Logger
state protocol.State // used to access the protocol state

blocks storage.Blocks
headers storage.Headers
executionReceipts storage.ExecutionReceipts
executionResults storage.ExecutionResults

lastFullBlockHeight *counters.PersistentStrictMonotonicCounter

// Job queue
finalizedBlockConsumer *jobqueue.ComponentConsumer
// Notifier for queue consumer
finalizedBlockNotifier engine.Notifier

resultsForest *ResultsForest
maxForestSize uint

collectionExecutedMetric module.CollectionExecutedMetric

latestPersistedSealedResult *LatestPersistedSealedResult
}

var _ hotstuff.FinalizationConsumer = (*Engine)(nil)

func New(
log zerolog.Logger,
latestPersistedSealedResult *LatestPersistedSealedResult,
) *Engine {
resultsForest := NewResultsForest(log, latestPersistedSealedResult)

e := &Engine{
log: log.With().Str("component", "ingestion").Logger(),
resultsForest: resultsForest,
}

cm := component.NewComponentManagerBuilder().
AddWorker(e.runForest).
AddWorker(e.runForestLoader).
Build()

e.cm = cm
e.Component = cm

return e
}

func (e *Engine) runForest(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
e.resultsForest.Start(ctx)

select {
case <-ctx.Done():
case <-e.resultsForest.Ready():
ready()
}

<-e.resultsForest.Done()
}

func (e *Engine) runForestLoader(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
loader := NewForestLoader(e.resultsForest, e.latestPersistedSealedResult.ResultID(), e.maxForestSize)

select {
case <-ctx.Done():
return
case <-e.resultsForest.Ready():
}

ready()
if err := loader.Run(ctx); err != nil {
ctx.Throw(err)
return
}
Comment on lines +94 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to run this repeatedly?

Copy link
Contributor Author

@peterargue peterargue May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. the approach I took was to run the loader once on startup with these steps:

  1. Add sealed results up to a limit and continue adding until we reach the latest sealed
  2. Add all unsealed results at once.

Then it exits and results are added by the ingestion engine directly in real time.

We don't really need to continuously run the loader since we can just add all results into the forest and only start a subset to manage resources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 happy with the ForestLoader not needing to run regularly in the background.

A couple follow-up comments regarding the scenario you are describing above:

a node that's been online for a while and decides to enable indexing

  • In this case, I am worried there might be too many results to load all unsealed blocks into memory in one go (we produce about 100k blocks a day). When we talked about it earlier, I probably advocated for loading all blocks into memory ... but didn't think about your scenario. Sorry. I think the scenario you are brining up is very valid. So what happens if not all results fit into the ResultsForest? I assume we would need to run ResultsForest repeatedly and then we would need a stopping criterion. You have something like that here, but I don't think we realistically can (or should even if we could) add back-pressure on the consensus follower.
  • Furthermore, I am worried that this scenario might occur without the node being rebooted. Imagine the node operator messing up the networking configuration and the node not getting the newest blocks for a couple days. Or the node being under-provisioned and falling progressively behind during periods with high execution load.

Overall, the following suggested approach is starting to manifest in my brain, which would address this larger class of problems in its entirety:

  • The ResultsForest has a capacity limit in a very specific way:
    • For now, I use height (While I prefer view as an indicator, height is more illustrative to explain I think. In addition, height creates less variability in the max size that ResultsForest could grow to. Anyway)
    • ResultsForest already remembers the lowest view at which results can be added. Lets denote that "cut-off view" with κ. Results for blocks with smaller views are rejected. Pruning can only increase κ but not decrease it.
    • In various places, we use the axiom that the protocol will at least finalize a block every FinalizationSafetyThreshold views. Note that this is not guaranteed by the consensus protocol (formally, there is no upper bound). But in practise we assume that there will be human intervention. We could use a similar argument here and say that the ResultsForest will accept additions of results for blocks with view $v$, if and only if κ ≤ $v$ ≤ κ + ψ.
    • Here, ψ would have a similar role as the FinalizationSafetyThreshold. Though, FinalizationSafetyThreshold is a protocol parameter which must be identical for all nodes; in contrast ψ can be set by each AN node operator independently. Anyway, lets say we set ψ = 250,000 views. This means that result indexing for an AN with this ψ value is only guaranteed to be live if the protocol always creates the next finalized seal within 250k views. That is 2 days of views, so I think that will hold in practise ... and if not, AN node operators could reboot their nodes with larger values as a temporary emergency measure.
  • I completely agree with you that during normal operations, ResultsForest would always have capacity.
  • While new results are being added at the top of the tree, pruning at the bottom of indexed results with a finalized seal frees capacity.
  • Now imagine that the node progressively falls behind with indexing by more than ψ views.
    • ResultsForest would start rejecting results for block views beyond its horizon of κ + ψ.
    • when it rejects an addition, AddResult would return a sentinel error of a dedicated type. Lets denote this error as ForestHorizonExceededError.
    • When receiving newly incorporated blocks from the consensus follower, Engine would try to add them to the ResultsForest and check for ForestHorizonExceededError (this code would need a minor extension).
    • In case of an ForestHorizonExceededError, the Engine knows that there could be gaps in the ResultsForest: Incorporated block with higher view might be added again when indexing progresses, the ResultsForest is pruned, i.e. κ increases. But the result that were just rejected will need to be added later via the ForestLoader. We use an atomic Boolean NeedBackfilling to memorize this.
    • As long as NeedBackfilling is true, we run it regularly (e.g. every 5 minutes).
    • Once the ForestLoader successfully executes this code, we know that the ForestLoader has added all remaining results into the ResultsForest and caught up with the consensus follower (caution: subtle happens-before requirement - I'll discuss that below). It sets NeedBackfilling to false, which means that the ForestLoader is not run anymore -- until the ResultsForest horizon is exceeded the next time (error ForestHorizonExceededError).

This is a lot of text explaining what the algorithm does. Sorry about this wall of text. Though, the algorithm is very simple to implement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What remains is to prove that this algorithm will always make progress (liveness proof), I'll provide a sketch:

Liveness Proof 🚧 under construction 🚧

The current implementation of the consensus follower guarantees the following happens-before relation (in more intuitive terms causality):

  1. First, incorporated blocks are persisted into the data base.
  2. The data base write happens before the OnBlockIncorporated consumers are called.

Prerequisite:

  • (a) The ForestLoader progresses up to a view x in line 103:
    entries, err := l.getUnsealedResults(previousHeader, finalizedHeader)
    if err != nil {
    return fmt.Errorf("could not get unsealed results: %w", err)
    }
    for _, re := range entries {
    err := l.forest.AddResult(re.result, re.header, false)
    if err != nil {
    return fmt.Errorf("could not add result %s to forest: %w", re.result.ID(), err)
    }
    }

    Access Node ResultForest Proof

🚧 continue this later 🚧

}

// OnFinalizedBlock is called by the follower engine after a block has been finalized and
// the state has been updated. Receives events from the finalization distributor.
func (e *Engine) OnFinalizedBlock(*model.Block) {
// Per specification of the `hotstuff.FinalizationConsumer` consumers of the `OnBlockIncorporated` notification must
// be non-blocking. This code is run on the hotpath of consensus and should induce as little overhead as possible.
//
// The input is coming from the node-internal consensus follower, which is a trusted component. Hence, we don't
// need to verify the inputs and queue them directly for processing by one of the engine's workers.
e.finalizedBlockNotifier.Notify()
}

// OnBlockIncorporated is called by the follower engine after a block has been certified and the state has been updated.
// Receives block incorporated events from the finalization distributor.
func (e *Engine) OnBlockIncorporated(hotstuffBlock *model.Block) {
// Per specification of the `hotstuff.FinalizationConsumer` consumers of the `OnBlockIncorporated` notification must
// be non-blocking. This code is run on the hotpath of consensus and should induce as little overhead as possible.
//
// The input is coming from the node-internal consensus follower, which is a trusted component. Hence, we don't
// need to verify the inputs and queue them directly for processing by one of the engine's workers.

// ToDO: queue incoming incorporated hotstuffBlock for processing in a dedicated pipeline
// The thread picking up the hotstuffBlock would then convert it to `flow.block` and process it further
//
// block, err := e.blocks.ByID(hotstuffBlock.BlockID)
// if err != nil {
// return irrecoverable.NewExceptionf("received incorporated block %s from consensus follower, but failed to retrieve full block: %w", err)
// }
// err = e.processCertifiedBlock(block)
// ...
}

// processCertifiedBlock adds results from the certified block to the results forest.
// No errors are expected during normal operation.
func (e *Engine) processCertifiedBlock(block *flow.Block) error {
for _, result := range block.Payload.Results {
if err := e.resultsForest.AddResult(result, block.Header, false); err != nil {
return fmt.Errorf("could not add result %s to forest: %w", result.ID(), err)
}
}
return nil
}

// processFinalizedBlock handles an incoming finalized block.
// It processes the block, indexes it for further processing, and requests missing collections if necessary.
//
// Expected errors during normal operation:
// - storage.ErrNotFound - if last full block height does not exist in the database.
// - storage.ErrAlreadyExists - if the collection within block or an execution result ID already exists in the database.
// - generic error in case of unexpected failure from the database layer, or failure
// to decode an existing database value.
func (e *Engine) processFinalizedBlock(block *flow.Block) error {
// index the block storage with each of the collection guarantee
err := e.blocks.IndexBlockForCollections(block.Header.ID(), flow.GetIDs(block.Payload.Guarantees))
if err != nil {
return fmt.Errorf("could not index block for collections: %w", err)
}

// index sealed results and notify the results forest
for _, seal := range block.Payload.Seals {
if err := e.executionResults.Index(seal.BlockID, seal.ResultID); err != nil {
return fmt.Errorf("could not index block for execution result (id: %s): %w", seal.ResultID, err)
}

if err := e.resultsForest.OnResultSealed(seal.ResultID); err != nil {
return fmt.Errorf("could not notify results forest of newly sealed result (id: %s): %w", seal.ResultID, err)
}
}
Comment on lines +163 to +166
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that you have worker threads in the Engine, I think it would be good to have them serve the resultsForest.


e.collectionExecutedMetric.BlockFinalized(block)

return nil
}

// handleExecutionReceipt persists the execution receipt locally.
// Storing the execution receipt and updates the collection executed metric.
//
// No errors are expected during normal operation.
func (e *Engine) handleExecutionReceipt(receipt *flow.ExecutionReceipt) error {
// persist the execution receipt locally, storing will also index the receipt
err := e.executionReceipts.Store(receipt)
Comment on lines +178 to +179
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think execution receipts are already persisted and indexed by the consensus follower. Is the AN doing extra stuff when storing the receipt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is copied from the existing ingestion engine and needs to be updated.

currently, we just store all receipts received from ENs immediately. This will be skipped for now and eventually we could use the size limited cache.

if err != nil {
return fmt.Errorf("failed to store execution receipt: %w", err)
}

e.collectionExecutedMetric.ExecutionReceiptReceived(receipt)
return nil
}
52 changes: 52 additions & 0 deletions engine/access/ingestion2/execution_result_container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ingestion2

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
pipeline "github.com/onflow/flow-go/module/executiondatasync/optimistic_syncing"
"github.com/onflow/flow-go/module/forest"
)

var _ forest.Vertex = (*ExecutionResultContainer)(nil)

// ExecutionResultContainer represents an ExecutionResult within the LevelledForest.
// Implements LevelledForest's Vertex interface.
type ExecutionResultContainer struct {
result *flow.ExecutionResult
resultID flow.Identifier // precomputed ID of result to avoid expensive hashing on each call
blockHeader *flow.Header // header of the block which the result is for
pipeline pipeline.Pipeline
}

// NewExecutionResultContainer instantiates an empty Equivalence Class (without any receipts)
// No errors are expected during normal operation.
func NewExecutionResultContainer(
result *flow.ExecutionResult,
header *flow.Header,
pipeline pipeline.Pipeline,
) (*ExecutionResultContainer, error) {
// sanity check: initial result should be for block
if header.ID() != result.BlockID {
return nil, fmt.Errorf("initial result is for different block")
}

// construct ExecutionResultContainer only containing initialReceipt
return &ExecutionResultContainer{
result: result,
resultID: result.ID(),
blockHeader: header,
pipeline: pipeline,
}, nil
}

// VertexID returns the ExecutionResult ID of the ExecutionResultContainer.
func (c *ExecutionResultContainer) VertexID() flow.Identifier { return c.resultID }

// Level returns the View of the block the ExecutionResult is associated with.
func (c *ExecutionResultContainer) Level() uint64 { return c.blockHeader.View }

// Parent returns the ID and view of the parent result.
func (c *ExecutionResultContainer) Parent() (flow.Identifier, uint64) {
return c.result.PreviousResultID, c.blockHeader.ParentView
}
Loading
Loading