Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(submit): simplified submit loop #1331

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 41 additions & 28 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,47 +55,60 @@ func SubmitLoopInner(
maxBatchSubmitBytes uint64, // max size of serialised batch in bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isLastBatchRecent must be removed if not used

createAndSubmitBatch func(maxSizeBytes uint64) (bytes uint64, err error),
) error {
var (
pendingBytes = atomic.Uint64{}
trigger = uchannel.NewNudger() // used to avoid busy waiting (using cpu) on trigger thread
submitter = uchannel.NewNudger() // used to avoid busy waiting (using cpu) on submitter thread
)
eg, ctx := errgroup.WithContext(ctx)

pendingBytes := atomic.Uint64{}
// 'trigger': this thread is responsible for waking up the submitter when enough data has been accumulated or time has passed
// if too much skew been accumulated, we back pressure block production
eg.Go(func() error {
ticker := time.NewTicker(maxBatchSubmitTime)
defer ticker.Stop()

trigger := uchannel.NewNudger() // used to avoid busy waiting (using cpu) on trigger thread
submitter := uchannel.NewNudger() // used to avoid busy waiting (using cpu) on submitter thread
pending := uint64(0)

eg.Go(func() error {
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
byTime := false
select {
case <-ctx.Done():
return nil
case n := <-bytesProduced:
pendingBytes.Add(uint64(n)) //nolint:gosec // bytes size is always positive
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
pending = pendingBytes.Add(uint64(n)) //nolint:gosec // bytes size is always positive
case <-ticker.C:
byTime = true
}

submitter.Nudge()
skewTime := batchSkewTime()
UpdateBatchSubmissionGauges(pending, unsubmittedBlocksNum(), skewTime)

byData := pending >= maxBatchSubmitBytes
bySkew := skewTime >= maxSkewTime

// check if submission is required by time or size
if byTime || bySkew || byData {
submitter.Nudge()
ticker.Reset(maxBatchSubmitTime)
}

// if the time between the last produced block and last submitted is greater than maxSkewTime we block here until we get a progress nudge from the submitter thread
if maxSkewTime < batchSkewTime() {
if bySkew {
select {
case <-ctx.Done():
return nil
case <-trigger.C:
}
}

}
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches. this thread is woken up every batch_submit_time/10 (we used /10 to avoid waiting too much if submission is not required for t-maxBatchSubmitTime, but it maybe required before t) to check if submission is required even if no new blocks have been produced
ticker := time.NewTicker(maxBatchSubmitTime / 10)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
case <-submitter.C:
}

Expand All @@ -104,14 +117,8 @@ func SubmitLoopInner(

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
nothingToSubmit := pending == 0

lastSubmissionIsRecent := isLastBatchRecent(maxBatchSubmitTime)
maxDataNotExceeded := pending <= maxBatchSubmitBytes

if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
break
if ctx.Err() != nil {
return nil
}

nConsumed, err := createAndSubmitBatch(maxBatchSubmitBytes)
Expand All @@ -130,14 +137,20 @@ func SubmitLoopInner(
return err
}
pending = uint64(unsubmittedBlocksBytes()) //nolint:gosec // bytes size is always positive
// after new batch submitted we check the skew time to wake up 'trigger' thread and restart block production
skewTime := batchSkewTime()
if skewTime < maxSkewTime {
trigger.Nudge()
}
UpdateBatchSubmissionGauges(pending, unsubmittedBlocksNum(), skewTime)

logger.Debug("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending, "skew time", skewTime)

// after new batch submitted we wake up 'trigger' thread just in case it is blocked due to too much skew
trigger.Nudge()

// Check if we need to submit another batch
byData := pending >= maxBatchSubmitBytes
bySkew := maxSkewTime < skewTime

// check if submission is required by skew or data
if !(bySkew || byData) {
break
}
}
// update pendingBytes with non submitted block bytes after all pending batches have been submitted
pendingBytes.Store(pending)
Expand Down
Loading