Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion sql/rowexec/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package rowexec

import (
"fmt"
"runtime/trace"

"github.com/dolthub/go-mysql-server/sql"
Expand All @@ -35,13 +36,36 @@ type BaseBuilder struct {
override sql.NodeExecBuilder
}

// panicSafeExecBuilder wraps another sql.NodeExecBuilder and converts panics to errors,
// preventing server-wide crashes when integrator-provided builders panic.
type panicSafeExecBuilder struct {
inner sql.NodeExecBuilder
}

func newPanicSafeExecBuilder(inner sql.NodeExecBuilder) sql.NodeExecBuilder {
if inner == nil {
return nil
}
return &panicSafeExecBuilder{inner: inner}
}

func (p *panicSafeExecBuilder) Build(ctx *sql.Context, n sql.Node, r sql.Row) (iter sql.RowIter, err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("exec builder panic: %v", rec)
iter = nil
}
}()
return p.inner.Build(ctx, n, r)
}

func (b *BaseBuilder) Build(ctx *sql.Context, n sql.Node, r sql.Row) (sql.RowIter, error) {
defer trace.StartRegion(ctx, "ExecBuilder.Build").End()
return b.buildNodeExec(ctx, n, r)
}

func NewOverrideBuilder(override sql.NodeExecBuilder) sql.NodeExecBuilder {
return &BaseBuilder{override: override}
return &BaseBuilder{override: newPanicSafeExecBuilder(override)}
}

// FinalizeIters applies the final transformations on sql.RowIter before execution.
Expand Down
26 changes: 16 additions & 10 deletions sql/rowexec/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ func newMergeJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode,
}

fullRow := make(sql.Row, len(row)+len(j.Left().Schema())+len(j.Right().Schema()))
fullRow[0] = row
if len(row) > 0 {
copy(fullRow[0:], row[:])
copy(fullRow, row)
}

// a merge join's first filter provides direction information
Expand All @@ -72,14 +71,21 @@ func newMergeJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode,
}

var iter sql.RowIter = &mergeJoinIter{
left: l,
right: r,
filters: filters[1:],
cmp: cmp,
typ: j.Op,
fullRow: fullRow,
scopeLen: j.ScopeLen,
parentLen: len(row) - j.ScopeLen,
left: l,
right: r,
filters: filters[1:],
cmp: cmp,
typ: j.Op,
fullRow: fullRow,
scopeLen: j.ScopeLen,
// parentLen is the portion of the parent row beyond the current scope.
// Clamp to zero to avoid negative offsets when scopeLen > len(row).
parentLen: func() int {
if len(row) >= j.ScopeLen {
return len(row) - j.ScopeLen
}
return 0
}(),
leftRowLen: len(j.Left().Schema()),
rightRowLen: len(j.Right().Schema()),
isReversed: j.IsReversed,
Expand Down
8 changes: 5 additions & 3 deletions sql/rowexec/node_builder.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ func (b *BaseBuilder) buildNodeExec(ctx *sql.Context, n sql.Node, row sql.Row) (
var err error
if b.override != nil {
iter, err = b.override.Build(ctx, n, row)
}
if err != nil {
return nil, err
if err != nil {
// If the override fails, fall back to the default builder
iter = nil
err = nil
}
}
if iter == nil {
iter, err = b.buildNodeExecNoAnalyze(ctx, n, row)
Expand Down