Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
192987e
implement row2
Oct 1, 2025
7a2a4ec
better
Oct 1, 2025
993a688
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 2, 2025
6deaf9d
disable row2
Oct 10, 2025
da6b39c
reenable row2
Oct 10, 2025
c6d99b4
implement expr2 for filters
Oct 13, 2025
6dd0309
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 13, 2025
f9c35e8
reduce type asserts
Oct 13, 2025
ec8b4dc
split send and receive
Oct 13, 2025
84997e1
directly return rows
Oct 13, 2025
310942f
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 13, 2025
cd838a9
resplit
Oct 13, 2025
0c3be29
TODO
Oct 14, 2025
f59647c
small fixes
Oct 14, 2025
83c6d0f
don't use vitess type
Oct 15, 2025
baf2d77
include Wrapper values for out of band values
Oct 17, 2025
f9dbd77
rename to rowvalue
Oct 17, 2025
8262f83
refactoring and fixing tests
Oct 20, 2025
1ee3afe
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 20, 2025
20bd197
added collations
Oct 21, 2025
474cb6c
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 21, 2025
13e882b
refactor
Oct 21, 2025
15f6b87
fix types over wire
Oct 22, 2025
3fa454f
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 22, 2025
f3cf2a3
fix nulls
Oct 22, 2025
dbdeb21
refactor and comments
Oct 23, 2025
8f27277
clean up
Oct 23, 2025
e88631e
more refactoring
Oct 23, 2025
1464a66
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 23, 2025
ad8119a
clean up
Oct 23, 2025
47c8ad6
hide under feature flag
Oct 23, 2025
5e3aeb5
no feature flag
Oct 23, 2025
2c201b1
reset memory
Oct 23, 2025
86202f9
feedback
Oct 27, 2025
c98a84b
fix
Oct 27, 2025
d315428
comments
Oct 27, 2025
b79f660
a ton of conversions
Oct 29, 2025
a6e326a
more stuff
Oct 29, 2025
f0cc6c6
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 29, 2025
5c592d8
remove todo
Oct 29, 2025
12ca1fe
so many types
Oct 29, 2025
0386edc
limiting scope of changes
Oct 29, 2025
aa3dfc1
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 29, 2025
706a096
conversion tests
Oct 29, 2025
4caa38d
remove bad tests
Oct 30, 2025
87ac0f5
fix
Oct 30, 2025
74d03cf
asdf
Oct 30, 2025
9b5519b
fix panic
Oct 30, 2025
6904da1
decimal conversion tests
Oct 30, 2025
51d3e90
fix
Oct 30, 2025
aa0d6d0
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 30, 2025
13556b0
comparison microbenchmarks
Oct 30, 2025
31b3a45
[ga-format-pr] Run ./format_repo.sh to fix formatting
jycor Oct 30, 2025
2ed39f8
aaa
Oct 30, 2025
b9da4b4
opt
Oct 30, 2025
cbcb957
bytes from string
Oct 30, 2025
d5eb7d9
comment
Oct 31, 2025
21ed9ff
feedback
Oct 31, 2025
f6063a1
rest of feedback
Oct 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/lestrrat-go/strftime v1.0.4
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0
github.com/shopspring/decimal v1.3.1
github.com/shopspring/decimal v1.4.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.31.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
180 changes: 179 additions & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ func (h *Handler) doQuery(
r, err = resultForEmptyIter(sqlCtx, rowIter, resultFields)
} else if analyzer.FlagIsSet(qFlags, sql.QFlagMax1Row) {
r, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, buf)
} else if vr, ok := rowIter.(sql.ValueRowIter); ok && vr.IsValueRowIter(sqlCtx) {
r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more)
} else {
r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more, buf)
}
Expand Down Expand Up @@ -768,6 +770,149 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
return r, processedAtLeastOneBatch, nil
}

func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) {
defer trace.StartRegion(ctx, "Handler.resultForValueRowIter").End()

eg, ctx := ctx.NewErrgroup()
pan2err := func(err *error) {
if recoveredPanic := recover(); recoveredPanic != nil {
wrappedErr := fmt.Errorf("handler caught panic: %v\n%s", recoveredPanic, debug.Stack())
*err = goerrors.Join(*err, wrappedErr)
}
}

// TODO: poll for closed connections should obviously also run even if
// we're doing something with an OK result or a single row result, etc.
// This should be in the caller.
pollCtx, cancelF := ctx.NewSubContext()
eg.Go(func() (err error) {
defer pan2err(&err)
return h.pollForClosedConnection(pollCtx, c)
})

// Default waitTime is one minute if there is no timeout configured, in which case
// it will loop to iterate again unless the socket died by the OS timeout or other problems.
// If there is a timeout, it will be enforced to ensure that Vitess has a chance to
// call Handler.CloseConnection()
waitTime := 1 * time.Minute
if h.readTimeout > 0 {
waitTime = h.readTimeout
}
timer := time.NewTimer(waitTime)
defer timer.Stop()

wg := sync.WaitGroup{}
wg.Add(2)

// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
// clean out rows that have already been spooled.
resetCallback := func(r *sqltypes.Result, more bool) error {
// A server-side cursor allows the caller to fetch results cached on the server-side,
// so if a cursor exists, we can't release the buffer memory yet.
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
defer buf.Reset()
}
return callback(r, more)
}

// TODO: send results instead of rows?
// Read rows from iter and send them off
var rowChan = make(chan sql.ValueRow, 512)
eg.Go(func() (err error) {
defer pan2err(&err)
defer wg.Done()
defer close(rowChan)
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
row, err := iter.NextValueRow(ctx)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
select {
case rowChan <- row:
case <-ctx.Done():
return nil
}
}
}
})

var res *sqltypes.Result
var processedAtLeastOneBatch bool
eg.Go(func() (err error) {
defer pan2err(&err)
defer cancelF()
defer wg.Done()
for {
if res == nil {
res = &sqltypes.Result{
Fields: resultFields,
Rows: make([][]sqltypes.Value, 0, rowsBatch),
}
}
if res.RowsAffected == rowsBatch {
if err := resetCallback(res, more); err != nil {
return err
}
res = nil
processedAtLeastOneBatch = true
continue
}

select {
case <-ctx.Done():
return context.Cause(ctx)
case <-timer.C:
if h.readTimeout != 0 {
// Cancel and return so Vitess can call the CloseConnection callback
ctx.GetLogger().Tracef("connection timeout")
return ErrRowTimeout.New()
}
case row, ok := <-rowChan:
if !ok {
return nil
}
resRow, err := RowValueToSQLValues(ctx, schema, row, buf)
if err != nil {
return err
}
ctx.GetLogger().Tracef("spooling result row %s", resRow)
res.Rows = append(res.Rows, resRow)
res.RowsAffected++
if !timer.Stop() {
<-timer.C
}
}
timer.Reset(waitTime)
}
})

// Close() kills this PID in the process list,
// wait until all rows have be sent over the wire
eg.Go(func() (err error) {
defer pan2err(&err)
wg.Wait()
return iter.Close(ctx)
})

err := eg.Wait()
if err != nil {
ctx.GetLogger().WithError(err).Warn("error running query")
if verboseErrorLogging {
fmt.Printf("Err: %+v", err)
}
return nil, false, err
}

return res, processedAtLeastOneBatch, nil
}

// See https://dev.mysql.com/doc/internals/en/status-flags.html
func setConnStatusFlags(ctx *sql.Context, c *mysql.Conn) error {
ok, err := isSessionAutocommit(ctx)
Expand Down Expand Up @@ -994,7 +1139,7 @@ func toSqlHelper(ctx *sql.Context, typ sql.Type, buf *sql.ByteBuffer, val interf
return typ.SQL(ctx, nil, val)
}
ret, err := typ.SQL(ctx, buf.Get(), val)
buf.Grow(ret.Len())
buf.Grow(ret.Len()) // TODO: shouldn't we check capacity beforehand?
return ret, err
}

Expand Down Expand Up @@ -1037,6 +1182,39 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express
return outVals, nil
}

func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, buf *sql.ByteBuffer) ([]sqltypes.Value, error) {
if len(sch) == 0 {
return []sqltypes.Value{}, nil
}
var err error
outVals := make([]sqltypes.Value, len(sch))
for i, col := range sch {
// TODO: remove this check once all Types implement this
valType, ok := col.Type.(sql.ValueType)
if !ok {
if row[i].IsNull() {
outVals[i] = sqltypes.NULL
continue
}
outVals[i] = sqltypes.MakeTrusted(row[i].Typ, row[i].Val)
continue
}
if buf == nil {
outVals[i], err = valType.SQLValue(ctx, row[i], nil)
if err != nil {
return nil, err
}
continue
}
outVals[i], err = valType.SQLValue(ctx, row[i], buf.Get())
if err != nil {
return nil, err
}
buf.Grow(outVals[i].Len())
}
return outVals, nil
}

func schemaToFields(ctx *sql.Context, s sql.Schema) []*querypb.Field {
charSetResults := ctx.GetCharacterSetResults()
fields := make([]*querypb.Field, len(s))
Expand Down
10 changes: 5 additions & 5 deletions sql/analyzer/replace_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ func replaceIdxSortHelper(ctx *sql.Context, scope *plan.Scope, node sql.Node, so
sortFields[i] = sortField
} else {
sameSortFields = false
col2, _ := col.(sql.Expression2)
valCol, _ := col.(sql.ValueExpression)
sortFields[i] = sql.SortField{
Column: col,
Column2: col2,
NullOrdering: sortField.NullOrdering,
Order: sortField.Order,
Column: col,
ValueExprColumn: valCol,
NullOrdering: sortField.NullOrdering,
Order: sortField.Order,
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions sql/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func (l *lruCache) Dispose() {
}

type rowsCache struct {
memory Freeable
reporter Reporter
rows []Row
rows2 []Row2
memory Freeable
reporter Reporter
rows []Row
valueRows []ValueRow
}

func newRowsCache(memory Freeable, r Reporter) *rowsCache {
Expand All @@ -92,17 +92,17 @@ func (c *rowsCache) Add(row Row) error {

func (c *rowsCache) Get() []Row { return c.rows }

func (c *rowsCache) Add2(row2 Row2) error {
func (c *rowsCache) AddValueRow(row ValueRow) error {
if !releaseMemoryIfNeeded(c.reporter, c.memory.Free) {
return ErrNoMemoryAvailable.New()
}

c.rows2 = append(c.rows2, row2)
c.valueRows = append(c.valueRows, row)
return nil
}

func (c *rowsCache) Get2() []Row2 {
return c.rows2
func (c *rowsCache) GetValueRow() []ValueRow {
return c.valueRows
}

func (c *rowsCache) Dispose() {
Expand Down
12 changes: 2 additions & 10 deletions sql/convert_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package sql
import (
"fmt"

"github.com/dolthub/vitess/go/vt/proto/query"

"github.com/dolthub/go-mysql-server/sql/values"

"github.com/dolthub/vitess/go/vt/proto/query"
)

// ConvertToValue converts the interface to a sql value.
Expand Down Expand Up @@ -90,11 +90,3 @@ func ConvertToValue(v interface{}) (Value, error) {
return Value{}, fmt.Errorf("type %T not implemented", v)
}
}

func MustConvertToValue(v interface{}) Value {
ret, err := ConvertToValue(v)
if err != nil {
panic(err)
}
return ret
}
12 changes: 6 additions & 6 deletions sql/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,13 +460,13 @@ func DebugString(nodeOrExpression interface{}) string {
panic(fmt.Sprintf("Expected sql.DebugString or fmt.Stringer for %T", nodeOrExpression))
}

// Expression2 is an experimental future interface alternative to Expression to provide faster access.
type Expression2 interface {
// ValueExpression is an experimental future interface alternative to Expression to provide faster access.
type ValueExpression interface {
Expression
// Eval2 evaluates the given row frame and returns a result.
Eval2(ctx *Context, row Row2) (Value, error)
// Type2 returns the expression type.
Type2() Type2
// EvalValue evaluates the given row frame and returns a result.
EvalValue(ctx *Context, row ValueRow) (Value, error)
// IsValueExpression indicates whether this expression and all its children support ValueExpression.
IsValueExpression() bool
}

var SystemVariables SystemVariableRegistry
Expand Down
Loading
Loading