-
Notifications
You must be signed in to change notification settings - Fork 190
wip - initial draft results forest #7398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #7398 +/- ##
===========================================
- Coverage 42.75% 39.29% -3.47%
===========================================
Files 1591 111 -1480
Lines 146192 13180 -133012
===========================================
- Hits 62507 5179 -57328
+ Misses 78345 7615 -70730
+ Partials 5340 386 -4954
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very good. I think this is going very much in the right direction. One high-level comment:
- I would suggest that you consolidate all the threading and orchestration in Engine. The engine decides when something is happening and the
ResultsForest
implements what is happening.
wg := sync.WaitGroup{} | ||
ch := f.notifier.Channel() | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
// wait for all pipelines to gracefully shutdown | ||
wg.Wait() | ||
return | ||
|
||
case <-ch: | ||
runningCount := uint(0) | ||
|
||
// Inspect all vertices in the tree forming from the latest persisted sealed result. | ||
// Count all running pipelines, and start new ones up to the maxRunningCount. | ||
// This will iterate over at most 2*maxRunningCount vertices. | ||
f.visitAllAncestorsBFS(f.latestPersistedSealedResult.ResultID(), func(container *ExecutionResultContainer) bool { | ||
state := container.pipeline.GetState() | ||
|
||
switch state { | ||
case pipeline.StateCanceled: | ||
// TODO: free the pipeline's resources (not necessarily here) | ||
return true | ||
|
||
case pipeline.StateComplete: | ||
if err := f.processCompleted(container.resultID); err != nil { | ||
ctx.Throw(err) | ||
} | ||
return true | ||
|
||
case pipeline.StatePending: | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
|
||
core := pipeline.NewCore() | ||
err := container.pipeline.Run(ctx, core) | ||
if err != nil && !errors.Is(err, context.Canceled) { | ||
ctx.Throw(fmt.Errorf("pipeline execution failed (result: %s): %w", container.resultID, err)) | ||
} | ||
}() | ||
} | ||
|
||
runningCount++ | ||
return runningCount < f.maxRunningCount | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two high-level comments:
- that is thread maintenance logic. I feel it complicates the
ResultsForest
because- it creates two domains: some work that the result forest is internally doing, vs work that is done by external threads of the result forest.
- From my perspective just encapsulating the business logic by itself provides a really nice abstraction of "elementary self-contained operations". In my head, it is the responsibility of the
ResultsForest
to provide those operations. While it is the responsibility of theEngine
to provide the orchestration and worker threads so thatResultsForest
(in conjunction with theForestLoader
) is doing its job over time
case <-ch: | ||
runningCount := uint(0) | ||
|
||
// Inspect all vertices in the tree forming from the latest persisted sealed result. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering what you think about the alternative approach:
- overall, I would prefer if that thread and work management lived in the Engine
- Assume that when we call
ResultsForest.AddResult
we create a new task and add it to some queue. Then we have a worker pool with 20 workers pulling elements of that queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into this. I'd have to use a priority queue based on height, but that should be simple enough.
Run(context.Context, Core) error | ||
GetState() State | ||
SetSealed() | ||
OnParentStateUpdated(State) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is essentially carrying over information from one pipeline to another. I think that would be better placed in the ResultsForest
. I guess all that is really relevant here is whether this pipeline is cancelled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is essentially carrying over information from one pipeline to another. I think that would be better placed in the ResultsForest.
This is called from the ResultsForest
. I'm not sure what you mean.
I guess all that is really relevant here is whether this pipeline is cancelled?
also if the parent is in an active state which is required to start downloading
func (f *ResultsForest) processCompleted(resultID flow.Identifier) error { | ||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
|
||
// first, ensure that the result ID is in the forest, otherwise the forest is in an inconsistent state | ||
container, ok := f.getContainer(resultID) | ||
if !ok { | ||
return fmt.Errorf("state update from unknown result vertex %s", resultID) | ||
} | ||
|
||
// next, ensure that this result descends from the latest persisted sealed result, otherwise | ||
// the forest is in an inconsistent state since persisting must be done sequentially | ||
if container.resultID != f.latestPersistedSealedResult.ResultID() { | ||
return fmt.Errorf("result %s does not match the latest persisted sealed result %s", container.resultID, f.latestPersistedSealedResult.ResultID()) | ||
} | ||
|
||
// finally, prune the forest up to the latest persisted result's block view | ||
latestPersistedView := container.blockHeader.View | ||
err := f.pruneUpToView(latestPersistedView) | ||
if err != nil { | ||
return fmt.Errorf("failed to prune results forest (view: %d): %w", latestPersistedView, err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do this right after calling Run
on the pipeline? 👇
flow-go/engine/access/ingestion2/results_forest.go
Lines 131 to 135 in 391632a
core := pipeline.NewCore() | |
err := container.pipeline.Run(ctx, core) | |
if err != nil && !errors.Is(err, context.Canceled) { | |
ctx.Throw(fmt.Errorf("pipeline execution failed (result: %s): %w", container.resultID, err)) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, that's a good point
Co-authored-by: Alexander Hentschel <[email protected]>
This is an early draft of the results forest and new access ingestion engine.